Coder Social home page Coder Social logo

envelope's Introduction

Envelope

Envelope is a configuration-driven framework for Apache Spark that makes it easy to develop Spark-based data processing pipelines.

Envelope is simply a pre-made Spark application that implements many of the tasks commonly found in ETL pipelines. In many cases, Envelope allows large pipelines to be developed on Spark with no coding required. When custom code is needed, there are pluggable points in Envelope for core functionality to be extended. Envelope works in batch and streaming modes.

Some examples of what you can easily do with Envelope:

  • Run a graph of Spark SQL queries, all in the memory of a single Spark job
  • Stream in event data from Apache Kafka, join to reference data, and write to Apache Kudu
  • Read in from an RDBMS table and write to Apache Parquet files on HDFS
  • Automatically merge into slowly changing dimensions (Type 1 and 2, and bi-temporal)
  • Insert custom DataFrame transformation logic for executing complex business rules

Get started

Requirements

Envelope requires Apache Spark 2.1.0 or above.

Additionally, if using these components, Envelope requires:

  • Apache Kafka 0.10 or above
  • Apache Kudu 1.4.0 or above
  • Apache HBase 1.2.0 or above
  • Apache ZooKeeper 3.4.5 or above
  • Apache Impala 2.7.0 or above

For Cloudera CDH 5, Kafka requires Cloudera's Kafka 2.1.0 or above, HBase and ZooKeeper requires CDH 5.7 or above, and Impala requires CDH 5.9 or above. For Cloudera CDH 6, any CDH 6.0 or above is required.

Downloading Envelope

Envelope and its dependencies can be downloaded as a single jar file from the GitHub repository Releases page.

Compiling Envelope

Alternatively, you can build the Envelope application from the top-level directory of the source code by running the Maven command:

mvn clean install

This will create envelope-0.7.0.jar in the build/envelope/target directory.

Finding examples

Envelope provides these example pipelines that you can run for yourself:

  • FIX: simulates receiving financial orders and executions and tracking the history of the orders over time.
    • This example includes a walkthrough that explains in detail how it meets the requirements.
  • FIX HBase: simulates receiving financial orders and executions and tracking the history of the orders over time in HBase.
  • Traffic: simulates receiving traffic conditions and calculating an aggregate view of traffic congestion.
  • Filesystem: demonstrates a batch job that reads a JSON file from HDFS and writes the data back to Avro files on HDFS.
  • Cloudera Navigator: implements a streaming job to ingest audit events from Cloudera Navigator into Kudu, HDFS and Solr.
  • Impala DDL: demonstrates updating Impala metadata, such as adding partitions and refreshing table metadata

Running Envelope

You can run Envelope by submitting it to Spark with the configuration file for your pipeline:

spark-submit envelope-0.7.0.jar your_pipeline.conf

Note: CDH5 uses spark2-submit instead of spark-submit for Spark 2 applications such as Envelope.

A helpful place to monitor your running pipeline is from the Spark UI for the job. You can find this via the YARN ResourceManager UI.

More information

If you are ready for more, dive in:

  • User Guide - details on the design, operations, configuration, and usage of Envelope
  • Configuration Specification - a deep-dive into the configuration options of Envelope
  • Inputs Guide - detailed information on each provided input, and how to write custom inputs
  • Derivers Guide - detailed information on each provided deriver, and how to write custom derivers
  • Planners Guide - directions and details on when, why, and how to use planners and associated outputs
  • Looping Guide - information and an example for defining loops in an Envelope pipeline
  • Decisions Guide - information on using decisions to dynamically choose which parts of the pipeline to run
  • Tasks Guide - how to apply side-effects in an Envelope pipeline that are separate from the data flow
  • Security Guide - how to run Envelope in secure cluster configurations
  • Repetitions Guide - how to re-run cached steps in a streaming job based on provided criteria
  • Events Guide - how to handle Envelope application lifecycle events

envelope's People

Contributors

alexvdedov avatar asdaraujo avatar ashishtyagicse avatar curtishoward avatar jeremybeard avatar jrkinley-zz avatar mansim07 avatar mfernest avatar rogerding avatar romainr avatar wmudge avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

envelope's Issues

Bi-temporal planner for other stores

Hi, I’m embarking on a new project on cloudera which requires a bi-temporal storage and I luckily stumbled upon Envolope. I’m so excited to see this framework exists. I see that bi-temporality is only offered in Kudu. Any plans to extend this to other stores?

Thanks
Manoj

Can envelope run nested loops ?

Hi,
Can envelope run nested loops ?

For Example how to implement following logic in envelope,

for (String str:values)
{
for(String str1:values2){
somestep(str1,str)
{
}

Add JmsQueue Source

A very common and required Queue Source is Jms.
Adding it would be most helpful!

Tasks Not working

Envelope tasks are not working , it is not able to find key class , instead it is looking for a "task" key
and it accepts ConfigValueType OBJECT

fail_test {
type = task
class= exception
message = "Test for XYC did not match expected results"
}

Spark Envelope job gets stuck in case an exception is thrown

I was running Spark Envelope in 'Yarn Client mode' and I noticed that after config gets validated and job get submitted, if an exception is thrown from Spark Framework then Job gets stuck and does not exit.

I made a small change in 'com.cloudera.labs.envelope.run.Runner' class and it got fixed. I just added following piece of code in try and finally block:

try{
if (StepUtils.hasStreamingStep(steps)) {
      LOG.debug("Streaming step(s) identified");

      runStreaming(steps);
    }
    else {
      LOG.debug("No streaming steps identified");

      runBatch(steps);
    }
}
finally{    
    shutdownThreadPool();
}

Please check this issue.

Envelope CAN NOT run YARN

spark-submit2 envelope*.jar XXX.conf
it cannot run YARN,because the spark-related configuration is in the "application" of the xxx.conf file, the sparksubmit cannot be obtained at startup. In other words, the driver will start locally instead of a node in the cluster.
so if we want to run cluster throgh then YARN , We need to extract the “application" from the xxx.conf file, get a separate configuration key-value configuration file, and then add the proposed file after the spark-submit2 --conf. e.g.
spark-submit2 --conf application.conf envelope*.jar xxx.conf

The above is my suggestion. I don't know if I have misinterpreted the program. Please advise.

pass Config to custom HBaseSerde

Hello

com.cloudera.labs.envelope.utils.hbase.HBaseUtils
// HBaseSerde util
public static HBaseSerde getSerde(Config config) {
HBaseSerde serde;
if (config.hasPath(SERDE_PROPERTY)) {
String serdeImpl = config.getString(SERDE_PROPERTY);
if (serdeImpl.equals(DEFAULT_SERDE_PROPERTY)) {
return new DefaultHBaseSerde();
} else {
try {
Class clazz = Class.forName(serdeImpl); Constructor constructor = clazz.getConstructor();
return (HBaseSerde) constructor.newInstance();
} catch (Exception e) {
LOG.error("Could not construct custom HBaseSerde instance [" + serdeImpl + "]: " + e);
throw new RuntimeException(e);
}
}
} else {
serde = new DefaultHBaseSerde();
}
serde.configure(config);

return serde;

}

in case of custom serde, serde.configure(config) is never called, there is no way to access config in provided serde

according to the comment below configure(config) should be called
com.cloudera.labs.envelope.utils.hbase.HBaseSerde
/**

Configure the Serde.
This will be passed the contents of an "input" or "output" configuration section.
@param config
*/
void configure(Config config);

NPE on RegexRowRule when a row field is empty

Regex rule fails with an NPE if a row contains an empty field and the field must be checked.

18/04/11 17:57:52 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 1.0 (TID 3, ****.****.**, executor 4): java.lang.NullPointerException
        at java.util.regex.Matcher.getTextLength(Matcher.java:1283)
        at java.util.regex.Matcher.reset(Matcher.java:309)
        at java.util.regex.Matcher.<init>(Matcher.java:229)
        at java.util.regex.Pattern.matcher(Pattern.java:1093)
        at com.cloudera.labs.envelope.derive.dq.RegexRowRule.check(RegexRowRule.java:55)

The class com.cloudera.labs.envelope.derive.dq.RegexRowRule need a null check in the method check(Row row): boolean:

  @Override
  public boolean check(Row row) {
    boolean check = true;
    for (String field : fields) {
      String value = row.getAs(field);
      Matcher matcher = pattern.matcher(value);
      check = check && matcher.matches();
      if (!check) {
        // No point continuing if failed
        break;
      }
    }
    return check;
  }

Decisions guide document

Hello,

Decisions guide document for "Step by value". I hope this needs to be corrected.

decide {
dependencies = [aggregate]
type = decision
if.true.steps = [run_if_true]
decision.method = step_by_key
step = generate
key = test1
}

decision.method = "step_by_value"
step ="aggregate".

Thanks
Bala

spaces in fields name

Hi, I'm trying to use envelope to read one csv file and, after some checks on fields format, write two csv files: the first one with "ok data" and the second one with "ko data". My csv source has fields names on 1st row but these names contain spaces. How can I make a "select [field 1], [field 2] from StepOfChecks where..." by query.literal ? Thanks
P.s.: I'm not sure that this is the right way to make questions... in case, redirect me, thanks.

org.apache.spark.internal.Logging issue

Hi,
I am using spark 2.4 version, please help me soon.
I am getting error org.apache.spark.internal.Logging.

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/internal/Logging
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at com.dell.sadatahub.azure.utils.SingletonSparkSession.getSparkSession(SingletonSparkSession.scala:26)
at com.dell.sadatahub.azure.DDVJsonParser$.main(DDVJsonParser.scala:15)
at com.dell.sadatahub.azure.DDVJsonParser.main(DDVJsonParser.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.internal.Logging
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 15 more

CDH Env requirements.

Hi Jared. I think the project is also an excellent tool which can encapsulate lots of coding works behind the scene. Currently, my company has already started using spark 2, kafka 0.10 along with CDH 5.10 without Kudu. In the doc, you wrote about using it aong with kafka 0.9. Will the distinct version (as well as maven dependencies) cause an issue? Besides, do you have plan, at some point, to upgrade the project to spark 2?

Impala DDL task authentication

The impala_ddl task is only able to authenticate with Impala using a keytab file local to the driver process. It would be nice to have the option to use the Kerberos ticket cache to create a LoginContext. The Simba Impala JDBC Driver supports this option by setting KrbAuthType=2.

Aside from Kudu, what makes this project require CDH?

I think this project is a great idea! There are many simple, common ETL-like tasks, that you'd not want to write a complete Spark job for, but instead just configure what you want. Envelope seems to do just that.
It seems, however, that this should not be limited to clusters running CDH. I can imagine this running on Hortonworks or MapR clusters just fine, except for the Kudu parts, that is.

Care to comment?

UDF in yarn mode not work,and get error

SQL:select 'tb_user_info' as tableName,count(1) as totalNum,statistics(result) as statis,DATE_FORMAT(register_time,'%Y-%m') as date,home_city as city from jdbcProcess group by DATE_FORMAT(register_time,'%Y-%m'),home_city,statis

Where statistics is a custom udf method,When I run the program in local mode, there is no error, as shown below:

=== Result of Batch Resolution ===
!'Aggregate ['DATE_FORMAT('register_time, %Y-%m), 'home_city, 'stat], [tb_user_info AS tableName#118, 'count(1) AS totalNum#119, 'statistics('result) AS stat#120, 'DATE_FORMAT('register_time, %Y-%m) AS date#121, 'home_city AS city#122] Aggregate [date_format(register_time#67, %Y-%m, Some(Asia/Shanghai)), home_city#64, UDF(result#68) AS stat#120], [tb_user_info AS tableName#118, count(1) AS totalNum#119L, UDF(result#68) AS stat#120, date_format(register_time#67, %Y-%m, Some(Asia/Shanghai)) AS date#121, home_city#64 AS city#122]
!+- 'UnresolvedRelation jdbcProcess +- SubqueryAlias jdbcprocess
! +- SerializeFromObject [if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, user_id), IntegerType) AS user_id#60, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, user_name), StringType), true) AS user_name#61, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, user_sex), IntegerType) AS user_sex#62, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, phone), StringType), true) AS phone#63, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 4, home_city), StringType), true) AS home_city#64, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 5, age), IntegerType) AS age#65, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 6, birthday), TimestampType), true) AS birthday#66, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 7, register_time), TimestampType), true) AS register_time#67, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 8, result), StringType), true) AS result#68, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 9, jobId), StringType), true) AS jobId#69]
! +- MapElements com.cloudera.labs.envelope.derive.DataQualityDeriver$CheckRowRules@1d921d, interface org.apache.spark.sql.Row, [StructField(user_id,IntegerType,true), StructField(user_name,StringType,true), StructField(user_sex,IntegerType,true), StructField(phone,StringType,true), StructField(home_city,StringType,true), StructField(age,IntegerType,true), StructField(birthday,TimestampType,true), StructField(register_time,TimestampType,true)], obj#59: org.apache.spark.sql.Row
! +- DeserializeToObject createexternalrow(user_id#0, user_name#1.toString, user_sex#2, phone#3.toString, home_city#4.toString, age#5, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, birthday#6, true), staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, register_time#7, true), StructField(user_id,IntegerType,true), StructField(user_name,StringType,true), StructField(user_sex,IntegerType,true), StructField(phone,StringType,true), StructField(home_city,StringType,true), StructField(age,IntegerType,true), StructField(birthday,TimestampType,true), StructField(register_time,TimestampType,true)), obj#58: org.apache.spark.sql.Row
! +- Project [user_id#0, user_name#1, user_sex#2, phone#3, home_city#4, age#5, birthday#6, register_time#7]
! +- Filter (age#5 > 10)
! +- SubqueryAlias jdbcinput
!
But when I used the yarn mode to run the program, I reported the following error.

18/08/28 22:56:59 INFO SparkSqlParser: Parsing command: statsProcess3
Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.spark.sql.AnalysisException: cannot resolve 'statis' given input columns: [jobId, age, birthday, user_name, user_sex, result, home_city, user_id, register_time, phone]; line 1 pos 217;
'Aggregate [date_format(register_time#47, %Y-%m), home_city#44, 'statis], [tb_user_info AS tableName#98, count(1) AS totalNum#99L, UDF(result#48) AS statis#100, date_format(register_time#47, %Y-%m) AS date#101, home_city#44 AS city#102]
+- SubqueryAlias jdbcprocess
+- SerializeFromObject [if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, user_id), IntegerType) AS user_id#40, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, user_name), StringType), true) AS user_name#41, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, user_sex), IntegerType) AS user_sex#42, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, phone), StringType), true) AS phone#43, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 4, home_city), StringType), true) AS home_city#44, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 5, age), IntegerType) AS age#45, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 6, birthday), TimestampType), true) AS birthday#46, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 7, register_time), TimestampType), true) AS register_time#47, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 8, result), StringType), true) AS result#48, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 9, jobId), StringType), true) AS jobId#49]
+- MapElements com.cloudera.labs.envelope.derive.DataQualityDeriver$CheckRowRules@4a6d6e4, interface org.apache.spark.sql.Row, [StructField(user_id,IntegerType,true), StructField(user_name,StringType,true), StructField(user_sex,IntegerType,true), StructField(phone,StringType,true), StructField(home_city,StringType,true), StructField(age,IntegerType,true), StructField(birthday,TimestampType,true), StructField(register_time,TimestampType,true)], obj#39: org.apache.spark.sql.Row
+- DeserializeToObject createexternalrow(user_id#0, user_name#1.toString, user_sex#2, phone#3.toString, home_city#4.toString, age#5, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, birthday#6, true), staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, register_time#7, true), StructField(user_id,IntegerType,true), StructField(user_name,StringType,true), StructField(user_sex,IntegerType,true), StructField(phone,StringType,true), StructField(home_city,StringType,true), StructField(age,IntegerType,true), StructField(birthday,TimestampType,true), StructField(register_time,TimestampType,true)), obj#38: org.apache.spark.sql.Row
+- Project [user_id#0, user_name#1, user_sex#2, phone#3, home_city#4, age#5, birthday#6, register_time#7]
+- Filter (age#5 > 10)
+- SubqueryAlias jdbcinput
+- Relation[user_id#0,user_name#1,user_sex#2,phone#3,home_city#4,age#5,birthday#6,register_time#7] JDBCRelation(tb_user_info) [numPartitions=1]

at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at com.cloudera.labs.envelope.run.Runner.awaitAllOffMainThreadsFinished(Runner.java:332)
at com.cloudera.labs.envelope.run.Runner.runBatch(Runner.java:300)
at com.cloudera.labs.envelope.run.Runner.run(Runner.java:93)
at com.cloudera.labs.envelope.EnvelopeMain.main(EnvelopeMain.java:46)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:750)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Caused by: org.apache.spark.sql.AnalysisException: cannot resolve 'statis' given input columns: [jobId, age, birthday, user_name, user_sex, result, home_city, user_id, register_time, phone]; line 1 pos 217;
'Aggregate [date_format(register_time#47, %Y-%m), home_city#44, 'statis], [tb_user_info AS tableName#98, count(1) AS totalNum#99L, UDF(result#48) AS statis#100, date_format(register_time#47, %Y-%m) AS date#101, home_city#44 AS city#102]
+- SubqueryAlias jdbcprocess
+- SerializeFromObject [if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, user_id), IntegerType) AS user_id#40, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, user_name), StringType), true) AS user_name#41, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, user_sex), IntegerType) AS user_sex#42, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, phone), StringType), true) AS phone#43, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 4, home_city), StringType), true) AS home_city#44, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 5, age), IntegerType) AS age#45, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 6, birthday), TimestampType), true) AS birthday#46, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 7, register_time), TimestampType), true) AS register_time#47, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 8, result), StringType), true) AS result#48, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 9, jobId), StringType), true) AS jobId#49]
+- MapElements com.cloudera.labs.envelope.derive.DataQualityDeriver$CheckRowRules@4a6d6e4, interface org.apache.spark.sql.Row, [StructField(user_id,IntegerType,true), StructField(user_name,StringType,true), StructField(user_sex,IntegerType,true), StructField(phone,StringType,true), StructField(home_city,StringType,true), StructField(age,IntegerType,true), StructField(birthday,TimestampType,true), StructField(register_time,TimestampType,true)], obj#39: org.apache.spark.sql.Row
+- DeserializeToObject createexternalrow(user_id#0, user_name#1.toString, user_sex#2, phone#3.toString, home_city#4.toString, age#5, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, birthday#6, true), staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, register_time#7, true), StructField(user_id,IntegerType,true), StructField(user_name,StringType,true), StructField(user_sex,IntegerType,true), StructField(phone,StringType,true), StructField(home_city,StringType,true), StructField(age,IntegerType,true), StructField(birthday,TimestampType,true), StructField(register_time,TimestampType,true)), obj#38: org.apache.spark.sql.Row
+- Project [user_id#0, user_name#1, user_sex#2, phone#3, home_city#4, age#5, birthday#6, register_time#7]
+- Filter (age#5 > 10)
+- SubqueryAlias jdbcinput
+- Relation[user_id#0,user_name#1,user_sex#2,phone#3,home_city#4,age#5,birthday#6,register_time#7] JDBCRelation(tb_user_info) [numPartitions=1]

at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:86)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:83)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:290)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:290)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:255)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:255)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:266)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:276)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:280)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:280)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$6.apply(QueryPlan.scala:285)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:285)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:255)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:83)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:76)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:128)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:76)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:57)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:52)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:63)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:592)
at com.cloudera.labs.envelope.derive.SQLDeriver.derive(SQLDeriver.java:69)
at com.cloudera.labs.envelope.run.BatchStep.submit(BatchStep.java:84)
at com.cloudera.labs.envelope.run.Runner$2.call(Runner.java:324)
at com.cloudera.labs.envelope.run.Runner$2.call(Runner.java:321)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

This is the java code I registered for udf:

        String name = udfConfig.getString("name");
        String className = udfConfig.getString("class");

        // null third argument means that registerJava will infer the return type
        Contexts.getSparkSession().udf().registerJava(name, className, null);

The UDF function java code is below:

public class Statistics implements UDF1<String, Integer>, ProvidesAlias {
@OverRide
public Integer call(String s) {
String[] units = s.split(",", -1);
return units.length;
}

@Override
public String getAlias() {
    return "statistics";
}

}
I HOPE SOMEONE TELL ME WHAT IS THE REASON,THANKS!!

NPE When access Spark Session in getExistingForFilters of RandomOutput

When try to implement getExistingForFilters of RandomOutput and access Spark Session, got NullPointerException.

Code like this:

val list = Contexts.getSparkSession().read().jdbc(url, "($query) t999", properties).collectAsList()

Exception log like this:

19/08/09 17:06:11 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 3, localhost, executor driver): java.lang.NullPointerException
	at org.apache.spark.sql.execution.SparkPlan.sparkContext(SparkPlan.scala:56)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.metrics$lzycompute(WholeStageCodegenExec.scala:511)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.metrics(WholeStageCodegenExec.scala:510)
	at org.apache.spark.sql.execution.SparkPlan.resetMetrics(SparkPlan.scala:85)
	at org.apache.spark.sql.Dataset$$anonfun$withAction$1.apply(Dataset.scala:3361)
	at org.apache.spark.sql.Dataset$$anonfun$withAction$1.apply(Dataset.scala:3360)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreach(TreeNode.scala:117)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3360)
	at org.apache.spark.sql.Dataset.collectAsList(Dataset.scala:2794)
	at com.thothinfo.ipd.dataflow.extension.output.JdbcExOutput.getExistingForFilters(JdbcExOutput.kt:248)
	at com.cloudera.labs.envelope.run.DataStep$JoinExistingForKeysFunction.call(DataStep.java:639)
	at com.cloudera.labs.envelope.run.DataStep$JoinExistingForKeysFunction.call(DataStep.java:598)
	at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:186)
	at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:186)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.ClassNotFoundException: org.apache.spark.internal.Logging

But when I tried to run the examples/filesystem example, I found the following error....
The pom.xml file in the project has not been changed. And the spark-streaming and spark-streaming-kafka's version is 2.2.0.cloudera2。Does someone tell me why?
ERROR :
[root@hadoopmaster cqx_dir]# spark-submit envelope-0.5.0.jar filesystem.conf
Multiple versions of Spark are installed but SPARK_MAJOR_VERSION is not set
Spark1 will be picked by default
18/07/14 15:38:38 INFO EnvelopeMain: Envelope application started
18/07/14 15:38:39 INFO EnvelopeMain: Configuration loaded
18/07/14 15:38:39 INFO Runner: Steps instantiated
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/internal/Logging
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at com.cloudera.labs.envelope.spark.Contexts.initializeBatchJob(Contexts.java:134)
at com.cloudera.labs.envelope.spark.Contexts.getSparkSession(Contexts.java:66)
at com.cloudera.labs.envelope.spark.Accumulators.(Accumulators.java:50)
at com.cloudera.labs.envelope.run.Runner.initializeAccumulators(Runner.java:364)
at com.cloudera.labs.envelope.run.Runner.run(Runner.java:81)
at com.cloudera.labs.envelope.EnvelopeMain.main(EnvelopeMain.java:47)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:750)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.internal.Logging
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.