Coder Social home page Coder Social logo

vesoft-inc / nebula-exchange Goto Github PK

View Code? Open in Web Editor NEW
27.0 27.0 36.0 1.59 MB

NebulaGraph Exchange is an Apache Spark application to parse data from different sources to NebulaGraph in a distributed environment. It supports both batch and streaming data in various formats and sources including other Graph Databases, RDBMS, Data warehouses, NoSQL, Message Bus, File systems, etc.

License: Apache License 2.0

Scala 99.56% Java 0.44%
data-import data-pipeline etl graph-database hacktoberfest nebulagraph spark

nebula-exchange's People

Contributors

amber1990zhang avatar bojackli avatar codelone avatar cooper-lzy avatar darionyaphet avatar diligencelai avatar dutor avatar eldinzhou avatar ggzone avatar guojun85 avatar haoxins avatar harrischu avatar ianhhhhhhhhe avatar jievince avatar jude-zhu avatar laura-ding avatar nicole00 avatar oldlady344 avatar randomjoe211 avatar riverzzz avatar shinji-ikarig avatar sophie-xie avatar sworduo avatar wey-gu avatar whitewum avatar xiajingchun avatar yixinglu avatar zhongqishang avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

nebula-exchange's Issues

[WIP]Fix the error "Field “xxx” does not exist." when data is imported from Kafka to Nebula Graph via Nebula Exchange

The bug was reported by a user on the Chinese forum: https://discuss.nebula-graph.com.cn/t/topic/2623/

For those who cannot understand Chinese well, please refer to the title of the issue for a basic background. The reason why the error occurs is that Nebula Exchange is not able to parse the "value" field of the data in Kafka.

Committer @guojun85 is working on this now. Thanks for his contribution in advance!

Support IGNORE_EXISTED_INDEX

Introduction

When there're indexes defined in the space schema, the performance of inserting new data will be impacted significantly. The more indexes are defined, the worse the performance will become.

Can we add a flag in Exchange to enable the IGNORE_EXISTED_INDEX feature which is supported in INSERT VERTEX and INSERT EDGE, as below:

INSERT VERTEX IGNORE_EXISTED_INDEX person(id) ...
INSERT EDGE IGNORE_EXISTED_INDEX like(grade) ...

Contents

Related work

support the vid with specific string prefix

support to concat a specific string prefix for vertex id or edge source id/target id.

scenes:
there exists same ids In the source data for different tags, and in order to ensure that points of different tag are independent vertices, the source data needs to be preprocessed, that is concat the specific string name with the original vid.

Vulnerable shared libraries might make nebula-exchange vulnerable. Can you help upgrade to patch versions?

Hi, @Nicole00 , @cooper-lzy , I'd like to report a vulnerability issue in com.vesoft:nebula-exchange:2.6.3.

Issue Description

com.vesoft:nebula-exchange:2.6.3 directly or transitively depends on 52 C libraries (.so) cross many platforms(such as x86-64, x86, arm64, armhf). However, I noticed that some C library are vulnerable, containing the following CVEs:

libzstd-jni.so from C project zstd(version:1.3.7) exposed 2 vulnerabilities:
CVE-2021-24031, CVE-2019-11922
liblz4-java.so from C project lz4(version:1.8.3) exposed 2 vulnerabilities:
CVE-2021-3520, CVE-2019-17543

Suggested Vulnerability Patch Versions

zstd has fixed the vulnerabilities in versions >=1.4.9
lz4 has fixed the vulnerabilities in versions >=1.9.2

Java build tools cannot report vulnerable C libraries, which may induce potential security issues to many downstream Java projects.
Could you please upgrade the above shared libraries to their patch versions?

Thanks for your help~
Best regards,

label tag8 does not exist.

你好,我在使用exchange将数据从starrocks迁移到nebula遇到以下问题

image-20221111173041131

application.conf为

{
  # Spark relation com.vesoft.exchange.common.config
  spark: {
    app: {
      name: Nebula Exchange 2.0
    }

    master:local

    driver: {
      cores: 1
      maxResultSize: 1G
    }

    executor: {
        memory:1G
    }

    cores:{
      max: 16
    }
  }


  # Nebula Graph relation com.vesoft.exchange.common.config
  nebula: {
    address:{
      graph:["xxx:9669"]
      meta:["xxx1:9559"]
    }
    user: xxx
    pswd: xxx
    space: xxx


    # nebula client connection parameters
    connection {
      timeout: 3000
      retry: 3
    }
    
    error: {
      # max number of failures, if the number of failures is bigger than max, then exit the application.
      max: 32
      # failed import job will be recorded in output path
      output: /tmp/errors
    }

    # use google's RateLimiter to limit the requests send to NebulaGraph
    rate: {
      # the stable throughput of RateLimiter
      limit: 1024
      # Acquires a permit from RateLimiter, unit: MILLISECONDS
      # if it can't be obtained within the specified timeout, then give up the request.
      timeout: 1000
    }
  }

  # Processing tags
  # There are tag com.vesoft.exchange.common.config examples for different dataSources.
  
  tags: [

    # MySql
    {
      name: tag8
      type: {
        source: mysql
        sink: client
      }
      host: "xxx"
      port: "9030"
      database: xxx
      table: xxx
      user: xxx
      password:"xxx"
      sentence: "select c_qxbh_fcfe925537604fa4bde0c2aa68a7a361, qxsjwyj_29add450718d44b8bcf27fb662f086a9, c_bs_b6dffaa9b6b147d0a8fd50800288d896, c_hld_a651c2b03c9846668bc6d8c3c36e8042, c_qxdj_f1ecbf4fdb124e2f98d5eff25e4a80ef, c_qxbx_f7fdd980711d4e4e99da81467489a6df, c_qxlx_a2b9afbc695443febadcdd60a6f143f7, c_qxms_f4456e3aa0bf4475ade0c8b2f3b2c31f, c_qxzt_c3e198b836054230892fff177c4bd4d2, c_zydl_cdf591c926564fbc8cac8bd32ec98575, c_zyxl_d9eaf86c143344ada25e31ad57a57897, c_qxly_fe2e01bcb0954347b69b8b687134911b, c_ywcqxclsj_a684714fe03847478964d61a89535dca, c_fxsj_ed665141711f4885bba25d68783dd901, c_fxbz_b4b73a92f6724f05ae064f33d382df45, c_fxr_ed67e99990434094b4ad4afbbbe1ee81, c_tbsj_b468f566a1274befb06a98e9edf2105f, c_tbbz_ec5a192032c44346bbb7cf1540404d1c, c_tbr_e6357b09c9df49e1b1c98f6b49c81448, c_sbsj_f97a16687fe64fbebe139622890f1e75, c_sbbz_af2245c6c33047b8a0ac1a53cc8749e7, c_sbr_b2ac44bae3b947d3a31a030950822f95, c_xqsj_b5d5d565c55b465099c84067e9e59da3, c_xqbz_eb26c79c58da4c56abed5da443cedd99, c_xqr_f2acb55006e94007b8920d69a28b8ccb, c_qxyy_d9021a741f9e42f0b223a1714c96e096, c_qxbw_a94c0cdb9ea84ae08ff75166334e3b5d, c_clcs_e3a8ff9e2c1e4ad9ad8499c0af50c555, c_clqkms_f13ed7ac430b44a09505c787a5565b03, c_yssj_ea33da1f865e41e7bdc79e429fdc3227, c_ysbm_bebf87655b074ac188659e0b172a2cfe, c_ysr_fcaec68d10954a09801e32c1a40ab8d6, c_sbbm_e838098bec344944afbe8beeef1588f8, c_sbyxglbz_ff1413b741314f538e23652b84a4f0c5, c_tyrq_f2f886d2632444748ec845f3a55cee99, c_lcjd_a7a77f5f90c84ed8bb531b244077eca6, c_clr_a453d320d21949f684def001c430868e, c_jsfl_cee20a42023f45e89d90c4b56e17fc61, c_qxyylb_beabf5fe5805497683b1fdbf0ca4d51a, c_sfxybpbj_c83485a68cef4c098bc118533eb56202, c_ylwt_f3c5fcc7612a437f852cfc66395e88e0, c_dgdw_ed1d23ac466e4d9b8279f0c8dff23c45 from collective_31478"
      fields: [c_qxbh_fcfe925537604fa4bde0c2aa68a7a361, qxsjwyj_29add450718d44b8bcf27fb662f086a9, c_bs_b6dffaa9b6b147d0a8fd50800288d896, c_hld_a651c2b03c9846668bc6d8c3c36e8042, c_qxdj_f1ecbf4fdb124e2f98d5eff25e4a80ef, c_qxbx_f7fdd980711d4e4e99da81467489a6df, c_qxlx_a2b9afbc695443febadcdd60a6f143f7, c_qxms_f4456e3aa0bf4475ade0c8b2f3b2c31f, c_qxzt_c3e198b836054230892fff177c4bd4d2, c_zydl_cdf591c926564fbc8cac8bd32ec98575, c_zyxl_d9eaf86c143344ada25e31ad57a57897, c_qxly_fe2e01bcb0954347b69b8b687134911b, c_ywcqxclsj_a684714fe03847478964d61a89535dca, c_fxsj_ed665141711f4885bba25d68783dd901, c_fxbz_b4b73a92f6724f05ae064f33d382df45, c_fxr_ed67e99990434094b4ad4afbbbe1ee81, c_tbsj_b468f566a1274befb06a98e9edf2105f, c_tbbz_ec5a192032c44346bbb7cf1540404d1c, c_tbr_e6357b09c9df49e1b1c98f6b49c81448, c_sbsj_f97a16687fe64fbebe139622890f1e75, c_sbbz_af2245c6c33047b8a0ac1a53cc8749e7, c_sbr_b2ac44bae3b947d3a31a030950822f95, c_xqsj_b5d5d565c55b465099c84067e9e59da3, c_xqbz_eb26c79c58da4c56abed5da443cedd99, c_xqr_f2acb55006e94007b8920d69a28b8ccb, c_qxyy_d9021a741f9e42f0b223a1714c96e096, c_qxbw_a94c0cdb9ea84ae08ff75166334e3b5d, c_clcs_e3a8ff9e2c1e4ad9ad8499c0af50c555, c_clqkms_f13ed7ac430b44a09505c787a5565b03, c_yssj_ea33da1f865e41e7bdc79e429fdc3227, c_ysbm_bebf87655b074ac188659e0b172a2cfe, c_ysr_fcaec68d10954a09801e32c1a40ab8d6, c_sbbm_e838098bec344944afbe8beeef1588f8, c_sbyxglbz_ff1413b741314f538e23652b84a4f0c5, c_tyrq_f2f886d2632444748ec845f3a55cee99, c_lcjd_a7a77f5f90c84ed8bb531b244077eca6, c_clr_a453d320d21949f684def001c430868e, c_jsfl_cee20a42023f45e89d90c4b56e17fc61, c_qxyylb_beabf5fe5805497683b1fdbf0ca4d51a, c_sfxybpbj_c83485a68cef4c098bc118533eb56202, c_ylwt_f3c5fcc7612a437f852cfc66395e88e0, c_dgdw_ed1d23ac466e4d9b8279f0c8dff23c45
      ]
      nebula.fields: [c_qxbh_fcfe925537604fa4bde0c2aa68a7a361, qxsjwyj_29add450718d44b8bcf27fb662f086a9, c_bs_b6dffaa9b6b147d0a8fd50800288d896, c_hld_a651c2b03c9846668bc6d8c3c36e8042, c_qxdj_f1ecbf4fdb124e2f98d5eff25e4a80ef, c_qxbx_f7fdd980711d4e4e99da81467489a6df, c_qxlx_a2b9afbc695443febadcdd60a6f143f7, c_qxms_f4456e3aa0bf4475ade0c8b2f3b2c31f, c_qxzt_c3e198b836054230892fff177c4bd4d2, c_zydl_cdf591c926564fbc8cac8bd32ec98575, c_zyxl_d9eaf86c143344ada25e31ad57a57897, c_qxly_fe2e01bcb0954347b69b8b687134911b, c_ywcqxclsj_a684714fe03847478964d61a89535dca, c_fxsj_ed665141711f4885bba25d68783dd901, c_fxbz_b4b73a92f6724f05ae064f33d382df45, c_fxr_ed67e99990434094b4ad4afbbbe1ee81, c_tbsj_b468f566a1274befb06a98e9edf2105f, c_tbbz_ec5a192032c44346bbb7cf1540404d1c, c_tbr_e6357b09c9df49e1b1c98f6b49c81448, c_sbsj_f97a16687fe64fbebe139622890f1e75, c_sbbz_af2245c6c33047b8a0ac1a53cc8749e7, c_sbr_b2ac44bae3b947d3a31a030950822f95, c_xqsj_b5d5d565c55b465099c84067e9e59da3, c_xqbz_eb26c79c58da4c56abed5da443cedd99, c_xqr_f2acb55006e94007b8920d69a28b8ccb, c_qxyy_d9021a741f9e42f0b223a1714c96e096, c_qxbw_a94c0cdb9ea84ae08ff75166334e3b5d, c_clcs_e3a8ff9e2c1e4ad9ad8499c0af50c555, c_clqkms_f13ed7ac430b44a09505c787a5565b03, c_yssj_ea33da1f865e41e7bdc79e429fdc3227, c_ysbm_bebf87655b074ac188659e0b172a2cfe, c_ysr_fcaec68d10954a09801e32c1a40ab8d6, c_sbbm_e838098bec344944afbe8beeef1588f8, c_sbyxglbz_ff1413b741314f538e23652b84a4f0c5, c_tyrq_f2f886d2632444748ec845f3a55cee99, c_lcjd_a7a77f5f90c84ed8bb531b244077eca6, c_clr_a453d320d21949f684def001c430868e, c_jsfl_cee20a42023f45e89d90c4b56e17fc61, c_qxyylb_beabf5fe5805497683b1fdbf0ca4d51a, c_sfxybpbj_c83485a68cef4c098bc118533eb56202, c_ylwt_f3c5fcc7612a437f852cfc66395e88e0, c_dgdw_ed1d23ac466e4d9b8279f0c8dff23c45]
      vertex: {
        field: com_ontology_entity_025d7bec0eb34213ac4c89b87f15376d
        # policy: "hash"
      }
      batch: 256
      partition: 32
    }
    ]
}

麻烦有空看下,thx.

Error casting handling of Null `lang.String cannot be cast to scala.runtime.Null$`

/data/spark/ spark-2.4.5-bin-hadoop2.6/bin/spark-submit --master "local" --class com.vesoft.nebula.exchange. Exchange /data/spark/exchange/nebula-exchang /10/12 12:29:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
84j:WARN No appenders could be found for logger (com.vesoft. exchange. common. config.Configs$). g41:WARN Please initialize the log4j system properly.
84J: WARN See http://logging-apache.org/log4j/1.2/faq.htmlinoconfig for more info.
‹ception in thread "main" java. lang.ClassCastException: java.lang-String cannot be cast to scala.runtime.Null$
at com.vesoft. exchange. common. config.Configs$.com/vesoft$exchange$common§config$Configs$$dataSourceConfig(Configs.scala:899)
at com.vesoft. exchange. common. config.Configs$§anonfun$parse$2.apply(Configs.scala:432) at com.vesoft. exchange. common. config-Configs$§anonfunsparse$2.apply(Configs.scala:394)
at scala. collection.Iterator$class. foreach(Iterator.scala:891)... at scala. collection.AbstractIterator. foreach (Iterator.scala:1334) at scala. collection. Iterablelike$class. foreach (Iterablelike.scala:72) at scala. collection.AbstractIterable. foreach(Iterable.scala:54) at com. vesoft. exchange. common. config.Configs$-parse(Configs.scala: 394) at com. vesoft.nebula. exchange. Exchange$-main(Exchange. scala: 79) at com. vesoft.nebula. exchange.Exchange.main(Exchange. scala) at sun.reflect.NativeMethodAccessor Impl.invoke®(Native Method)
at sun.reflect.NativeMethodAccessor Impl. invoke(NativeMethodAccessor Impl. java: 62)
at sun.reflect.DelegatingMethodAccessor Impl. invoke (DelegatingMethodAccessor Impl. java:43)
at java. lang-reflect.Method. invoke (Method. java:498)
at org-apache. spark. deploy. JavaMainApplication.start (SparkApplication.scala: 52)
at org-apache. spark.deploy-SparkSubmit.org/apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845) at org-apache. spark.deploy. SparkSubmit.doRurMain$1(SparkSubmit.scala:161) at org-apache.spark.deploy.SparkSubmit. submit (SparkSubmit. scala: 184) at org-apache. spark. deploy-SparkSubmit.doSubmit (SparkSubmit. scala:86) at org-apache. spark.deploy. SparkSubmit$$anon$2. doSubmit (SparkSubmit. scala:920) at org-apache. spark. deploy-SparkSubmit$.main(SparkSubmit,scala: 929) at org-apache. spark.deploy SparkSubmit,main(SparkSubmit, scala)

support to generate unique directory for error.path to avoid the data confuse

Introduction

now the error path is configed by user config file. If you run the exchange app over one times without modify error.output, the path will be reused, and the data will append to the file in error path, causing data confuse.

in order to avoid this, we need to generate a unique directory for every spark application to store the writing failed data.

Contents

Related work

The HDFS file upload utility class may have the possibility of data loss.

经过对hdfs文件上传时的代码逻辑分析,看代码存在可能丢数据的风险

代码位置:https://github.com/vesoft-inc/nebula-exchange/blob/master/exchange-common/src/main/scala/com/vesoft/exchange/common/utils/HDFSUtils.scala
代码方法:

def upload(localPath: String, remotePath: String, namenode: String = null): Unit = {
try {
val localFile = new File(localPath)
if (!localFile.exists() || localFile.length() <= 0) {
return
}
} catch {
case e: Throwable =>
LOG.warn("check for empty local file error, but you can ignore this check error. " +
"If there is empty sst file in your hdfs, please delete it manually",
e)
}
val system = getFileSystem(namenode)
system.copyFromLocalFile(new Path(localPath), new Path(remotePath))
}

(1)校验文件不存在,看调用逻辑,只有文件生成后才会上传文件,如果上传的时候文件不存在了,也没有抛出异常和记录,这部分代码相当于把异常吃掉了,感觉存在丢数据风险

if (!localFile.exists() || localFile.length() <= 0) {
return}

(2)catch阶段感觉存在同样未对异常处理的问题

} catch {
case e: Throwable =>
LOG.warn("check for empty local file error, but you can ignore this check error. " +
"If there is empty sst file in your hdfs, please delete it manually",
e)
}

(3)经过测试,发现目前文件存在并发情况下被其他任务删除的场景下,hdfs会上传存在size为0的文件,影响ingest

解决办法

个人认为是否需要把这些异常统一抛出去,executor执行情况下收到异常将会kill容器,重试task保证数据完整性

期望回复

Whether to forcibly enable repartitioning when the number of nebula space partitions is greater than 1

I found a problem that resulted in the generated SST file containing only the key without the TagID

image

Desription: Accourding to struct of 3.0 vertex data:

image

If all goes well, when the Exchange program is finished, the SST file will contain data for both keys

    {
      name: tag-name-1
      type: {
        source: csv
        sink: sst
      }
      path: hdfs tag path 2

      fields: [csv-field-0, csv-field-1, csv-field-2]
      nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2]
      vertex: {
        field:csv-field-0
      }
      separator: ","
      header: true
      batch: 256
      partition: 32
      repartitionWithNebula: false
    }

However, if you follow the above configuration file, the generated SST files will only contain the key without the TagID

Here's why,the sst writer changes along with the partitioning information of the key, causing lower-ranked data in the same task to overwrite previous data(with same part)

https://github.com/DemocracyAndLiberty/nebula-exchange/blob/master/exchange-common/src/main/scala/com/vesoft/exchange/common/writer/FileBaseWriter.scala

        if (part != currentPart) {
          if (writer != null) {
            writer.close()
            val localFile = s"$localPath/$currentPart-$taskID.sst"
            HDFSUtils.upload(localFile,
                             s"$remotePath/${currentPart}/$currentPart-$taskID.sst",
                             namenode)
            Files.delete(Paths.get(localFile))
          }
          currentPart = part
          val tmp = s"$localPath/$currentPart-$taskID.sst"
          writer = new NebulaSSTWriter(tmp)
          writer.prepare()
        }

Accroding to https://github.com/DemocracyAndLiberty/nebula-exchange/blob/master/exchange-common/src/main/scala/com/vesoft/exchange/common/processor/Processor.scala,I noticed that setting repartitionWithNebula to true solved this problem when the number of nebula space partitions is greater than 1.

So whether to forcibly enable repartitioning when the number of nebula space partitions is greater than 1?

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/sql/sources/v2/StreamWriteSupport

使用exchange工具导入mysql数据时产生如下的错误信息:
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/sql/sources/v2/StreamWriteSupport
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:473)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370)
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:46)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:303)
at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:297)
at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108)
at scala.collection.TraversableLike.filter(TraversableLike.scala:395)
at scala.collection.TraversableLike.filter$(TraversableLike.scala:395)
at scala.collection.AbstractTraversable.filter(Traversable.scala:108)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:657)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:725)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:207)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:171)
at com.vesoft.nebula.exchange.reader.MySQLReader.read(ServerBaseReader.scala:94)
at com.vesoft.nebula.exchange.Exchange$.com$vesoft$nebula$exchange$Exchange$$createDataSource(Exchange.scala:258)
at com.vesoft.nebula.exchange.Exchange$$anonfun$main$2.apply(Exchange.scala:105)
at com.vesoft.nebula.exchange.Exchange$$anonfun$main$2.apply(Exchange.scala:95)
at scala.collection.immutable.List.foreach(List.scala:431)
at com.vesoft.nebula.exchange.Exchange$.main(Exchange.scala:95)
at com.vesoft.nebula.exchange.Exchange.main(Exchange.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:958)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.sources.v2.StreamWriteSupport
at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 52 more
22/07/06 16:32:29 INFO SparkContext: Invoking stop() from shutdown hook
22/07/06 16:32:29 INFO SparkUI: Stopped Spark web UI at http://172.29.200.207:4040
22/07/06 16:32:29 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
22/07/06 16:32:29 INFO MemoryStore: MemoryStore cleared
22/07/06 16:32:29 INFO BlockManager: BlockManager stopped
22/07/06 16:32:29 INFO BlockManagerMaster: BlockManagerMaster stopped
22/07/06 16:32:29 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
22/07/06 16:32:29 INFO SparkContext: Successfully stopped SparkContext
22/07/06 16:32:29 INFO ShutdownHookManager: Shutdown hook called
22/07/06 16:32:29 INFO ShutdownHookManager: Deleting directory /tmp/spark-ec124241-b2e3-49f8-a79a-258fe4994826
22/07/06 16:32:29 INFO ShutdownHookManager: Deleting directory /tmp/spark-08488d38-d9e2-4c52-a138-9e75b73cdb7b

[doc improve]How to import data from multiple tables in mysql into one tag/edge

nebula-exchange version v2.6

As title, after some google I found this https://stackoverflow.com/questions/49130381/instead-of-fetching-multiple-tables-using-pyspark-how-can-we-execute-join-query, but here needs to be modified, and when I editing this issue I found this modification is same as:

if (sentence != null) {
df.createOrReplaceTempView(mysqlConfig.table)
session.sql(sentence)
} else {
df
}

So, it would be better to add the usage to the documentation

doc: Repo desc and topics

Desc:

  • NebulaGraph Exchange is an Apache Spark application to parse data from different sources to NebulaGraph in a distributed environment. It supports both batch and streaming data in various formats and sources including other Graph Databases, RDBMS, Data warehouses, NoSQL, Message Bus, File systems, etc.

Topics:

  • Graph-Database
  • Spark
  • NebulaGraph
  • Data-Pipeline
  • Data-Import
  • ETL

Please @Nicole00 help review/rephrase and then let's ask @Shinji-IkariG to help edit the repo :)

Incorrect partition id is generated in SST mode

Description

The GetPartitionId algorithm in Exchange is not aligned with Nebula Graph, as a results, string vids with size of 8 might be assigned to a wrong patition, after ingestion, the vertices can't be fetched.

Take a look at the code in Nebula Graph, you will see the main difference is Exchange always hash string vids, while Nebula will take any byte[8] as uint64.

Get error with the fix from https://github.com/vesoft-inc/nebula-exchange/pull/145

Please check the FAQ documentation before raising an issue

Describe the bug (required)

Once I merged the #145 PR into my branch, I still got the error

│ Exception in thread "main" java.lang.NoSuchMethodError: 'java.lang.String com.google.common.net.HostAndPort.getHost()'                                                                             │
│     at com.vesoft.exchange.common.GraphProvider.$anonfun$new$1(GraphProvider.scala:36)    ```

**Your Environments (__required__)**

version Nebula Exchange 3.0

io.streamnative.connectors:pulsar-spark-connector_2.11:jar:2.4.5

无法找到io.streamnative.connectors:pulsar-spark-connector_2.11:jar:2.4.5,尝试了各种镜像都没有这个版本
改成这个后可以成功打包

<dependency>
    <groupId>io.streamnative.connectors</groupId>
    <artifactId>pulsar-spark-connector_2.12</artifactId>
    <version>3.1.1.2</version>
</dependency>

Not enough space to cache rdd in memory warning

Please check the FAQ documentation before raising an issue

Describe the bug (required)
I am running a nebula-exchange with spark-operator, and I got Not enough space to cache '' in memory warning.

Your Environments (required)

  • nebula cluster info
    v3.4.0
    graphd: 3 (128 Gb, 2 TB SSD Drive)
    metad: 3 (128 Gb, 2 TB SSD Drive)
    storaged: 5 (128 Gb, 2 TB SSD Drive)

  • spark-operator
    driver: 1 (16 GB)
    Executor: 20 (140 GB)

  • OS: uname -a
  • Commit id (e.g. a3ffc7d8)

How To Reproduce(required)

Steps to reproduce the behavior:

  1. Step 1
  2. Step 2
  3. Step 3

Expected behavior

Additional context

{"level":"WARN","timestamp":"2023-03-30 23:38:41,633","thread":"Executor task launch worker for task 5592.0 in stage 1.0 (TID 5593)","message":"Not enough space to cache rdd_6_5592 in memory! (computed 24.6 MiB so far)"}
{"level":"WARN","timestamp":"2023-03-30 23:38:41,633","thread":"Executor task launch worker for task 5592.0 in stage 1.0 (TID 5593)","message":"Persisting block rdd_6_5592 to disk instead."}
{"level":"WARN","timestamp":"2023-03-30 23:38:42,289","thread":"Executor task launch worker for task 5580.0 in stage 1.0 (TID 5581)","message":"Not enough space to cache rdd_6_5580 in memory! (computed 94.3 MiB so far)"}
{"level":"WARN","timestamp":"2023-03-30 23:38:42,289","thread":"Executor task launch worker for task 5580.0 in stage 1.0 (TID 5581)","message":"Persisting block rdd_6_5580 to disk instead."}
{"level":"WARN","timestamp":"2023-03-30 23:38:43,286","thread":"Executor task launch worker for task 5580.0 in stage 1.0 (TID 5581)","message":"Not enough space to cache rdd_6_5580 in memory! (computed 94.3 MiB so far)"}
{"level":"WARN","timestamp":"2023-03-30 23:38:44,604","thread":"Executor task launch worker for task 5592.0 in stage 1.0 (TID 5593)","message":"Not enough space to cache rdd_6_5592 in memory! (computed 94.2 MiB so far)"}

config: extract local configs in tag/edge to global

  • extract some local configs in tag、edge to global config, and reserve the local config which has higher priority.
  • such as type.source, type.sink, batch, partition, header, delimiter, and for database datasource, the user, password, url, driver config can also be extracted to global one.

could nebula-exchange support upload sst file to hdfs with krb5

Could nebula-exchange support upload sst file to hdfs with krb5?
I submit spark task, and I found problem below:

User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 18 in stage 0.0 failed 8 times, most recent failure: Lost task 18.7 in stage 0.0 (TID 12, hdp447.bigdata.zzt.qianxin-inc.cn, executor 10): java.io.IOException: Failed on local exception: java.io.IOException: org.apache.hadoop.security.AccessControlException: Client cannot authenticate via:[TOKEN, KERBEROS];

support hierarchical json format in json datasource

Today, json datasource only supports "flat" format, e.g., each line of the json file should be like:

 {
"key1:"value1, 
"key2:"value2",
...
}

In some case, the json file could use hierarchical structure, the value of a key is a child-object or even an array of objects, e.g.

{
"key1":
       [
           {"key11":"value11", "key12":"value12"},
           {"key21":"value21", "key22":"value22"},
           ...
       ]
}

Can we add support for this?

Support for generating SST files from Hive tables and storing them in S3.

Is your feature request related to a problem? Please describe.
目前,在spark中生成的sst文件只能存储在HDFS上,不太方便。最好能支持生成的sst文件直接存储在s3上。

Describe the solution you'd like
hive转sst,支持写入s3的配置

Additional context
数据从Hive转sst,然后导入NebulaGraph。sst文件如果能直接写入s3对象存储,下载会方便很多。

support custom repartitioner when write sst files

  1. support custom repartitioner which will repart the sst's key value data according to nebula part.
  2. make sure the keys in different sst files do not overloap.
  3. improve the sst files' ingest process.

When importing Oracle data, it is not possible to use the Oracle built-in json_value function.

软件版本

  • spark2.4.3
  • nebula-exchange_spark_2.4-3.3.0.jar
  • oracle19.3

执行SQL

SELECT UNIQUE_ID unique_id, json_value(ATTRS, '$.node_name') as node_name  FROM KG_NODE_INFO WHERE NODE_TYPE='xxx' and APPID='xxx'

报错信息

Caused by: org.apache.spark.sql.AnalysisException: Undefined function: 'json_value'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 28
	at org.apache.spark.sql.catalyst.analysis.Analyzer$LookupFunctions$$anonfun$apply$15$$anonfun$applyOrElse$49.apply(Analyzer.scala:1281)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$LookupFunctions$$anonfun$apply$15$$anonfun$applyOrElse$49.apply(Analyzer.scala:1281)
	at org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:53)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$LookupFunctions$$anonfun$apply$15.applyOrElse(Analyzer.scala:1280)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$LookupFunctions$$anonfun$apply$15.applyOrElse(Analyzer.scala:1272)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:256)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:256)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:255)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:261)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:261)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:324)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:261)
	at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsDown$1.apply(QueryPlan.scala:83)
	at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsDown$1.apply(QueryPlan.scala:83)
	at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:105)
	at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:105)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)

Few Questions: Importing SST files into Nebula graph

General Question

I was following Nebula official documentation

Below are few questions

  1. Is space partition_num and tags/edges partition we define in application.conf related?
  2. Should the tags/edges partition be equal to space partition_num?
  3. Can I submit SST file generation spark jobs multiple times with different application.conf settings?
  4. Should I copy SST folder from hdfs to all storage nodes?

@wey-gu Can you please help me here

got the `fs.default.name` from remote path

Introduction

now the sst path config includes local, remote, hdfs.namenode. we can get the fs.default.name from remote path, and remove the hdfs.namenode config. Then user must config remote path by format hdfs://ip:port/path/

Related work

modify the config resolve process and hdfs access process.

Kafka source optimization

If we wire upstream CDC sources to Kafka, we could have streaming data with different schema, and change mode(delete, insert, update).

We may be improving exchange Kafka on:

  • configuration could map data to multiple edge types?

I could relate to the limitation now it seems it's already doable from the conf structure. @salahaz could you please help provide more context?

  • Insert/update/delete mode could be handled automatically?

@salahaz could you help provide more ideas on this pain point?

improve the config file

  • extract some local configs in tag/edge to global config

    • extract some local configs in tag、edge to global config, and reserve the local config which has higher priority.
    • such as type.source, type.sink, batch, partition, header, delimiter, and for database datasource, the user, password, url, driver config can also be extracted to global one.
  • add a command to give a simplest config file template according to the datasource

    • spark-submit --master {mode} --class com.vesoft.nebula.exchange.Exchange exchange.jar -generateConfigTemplate csv

enhance the import status

add more log info to show the success amount of request and record. And to make difference between exchange log and spark log.

write sst模式遇到Throwable问题时,任务的退出状态仍为成功

问题定位
代码位置:https://github.com/vesoft-inc/nebula-exchange/blob/master/exchange-common/src/main/scala/com/vesoft/exchange/common/writer/FileBaseWriter.scala
112行
分析代码逻辑
} catch { case e: Throwable => { LOG.error("sst file write error,", e) batchFailure.add(1) } }
这里spark任务在executor里执行的时候,如果遇到了Throwable,则把错误数记录在了batchFailure变量上,不进行其他处理。
最终状态,spark会变成执行成功,但是sst没有写成功。
待修复问题
(1)这里会导致丢数据的问题。我认为可以增加修复逻辑:在driver进行校验,如果batchFailure>0时,则把spark退出状态修改为非正常退出。
学习咨询
(1)为什么catch到异常的时候不直接继续抛出异常,让spark的executor重试3次之后,把任务整体会变成失败。而是把错误记录在了batchFailure变量,是基于什么规范考虑的吗?

support the config file in hdfs

Exchange can support the jar in hdfs to run but can not supoort the config file in hdfs,it will fail and return the error :
Exception in thread "main" java.lang.IllegalArgumentException: hdfs:/192.168.15.2:9000/tn/app_bs_cli.conf not exist
at com.vesoft.exchange.common.config.Configs$.parse(Configs.scala:260)
It should be supported by putting the config file in hdfs.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.