Xgboost: [jvm-packages] PySpark Support Checklist

Created on 8 Jun 2018  路  28Comments  路  Source: dmlc/xgboost

Overview:

This is a meta issue for implementing PySpark support.

TODO:

  • [x] Develop PySpark wrapper code.
  • [ ] Decide how to package the code.
  • [ ] Create a testing framework.
  • [ ] Write API Documentation.

Specifics:

Develop PySpark wrapper code:

I have developed a PySpark wrapper for the XGBoostEstimator, XGBoostRegressionModel and XGBoostClassificationModel objects, with an additional XGBoost specific Pipeline object, as our Scala estimators don't sit within org.apache.spark.ml.

Decide how to package the code:

There are a few location options here:

  1. Place the PySpark libraries inside the xgboost-spark jar. (Similar to how Spark-NLP and GraphFrames work)
  2. Distribute a sepperate zip file with the PySpark libraries, giving the option for PyPI packaging. (Similar to how H2O's PySparkling works)

We also need to decide on a Python package name, e.g. spark-xgboost, pyspark-xgboost or pyspark-xgb.

Create a testing framework:

Clearly we need to perform unit tests, similar to the discussion started in #3325

Write API Documentation:

We could do something similar to the existing Python one..

Most helpful comment

@thesuperzapper I am using xgboost4j-spark-0.80.jar along with xgboost4j-0.80.jar, as well as the wrapper functions stored in sparkxgb.zip which is compatible with version 0.80 of xgboost. I am able to make predictions after fitting the data with a list of features and target value. I would like to now extract the information gain of each feature, but I have not managed to figure out a way to obtain these results. Would someone be able to point me in the right direction. Much appreciated!

prediction = ['prediction']

features = ['feature_1', 'feature_2',  'feature_3', 'feature_4']

vectorAssembler = VectorAssembler().setInputCols(features).setOutputCol('features')

xgboost = XGBoostClassifier(
    featuresCol='features',
    labelCol='label',
    predictionCol='prediction'
)

pipeline = Pipeline().setStages([vectorAssembler, xgboost])

model = pipeline.fit(train)

results = model.transform(test).select(features+prediction)

results.select('prediction').distinct().collect()
[Row(prediction=0.0), Row(prediction=1.0)]

I have tried accessing the below with no luck.

print(model.stages[1].nativeBooster.getFeatureScore.__doc__)

Help on class Booster in package ml.dmlc.xgboost4j.scala:

Booster implements scala.Serializable, com.esotericsoftware.kryo.KryoSerializable {
|  
|  Methods defined here:
|  
|  getFeatureScore(String) : Map
|  
|  ------------------------------------------------------------
|  Fields defined here:
|  
|  ------------------------------------------------------------
|  Internal classes defined here:
|  
}

All 28 comments

would you please send a WIP version of PR when you are doing the work so that we can discuss some problems undergoing if necessary

@CodingCat there are a few other non-essential things I think we should support.

  • We should fix saveAsHadoopFile( in the base Spark XGBoost API, so that there is at least rudimentary interoperablity with the Base Python XGBoost API. (See: #2480).

    • While you can use .booster.saveModel("xx") this does not support HDFS location, causing it to simply not work in yarn-cluster mode.

  • In a similar vein to above, I think we should implement some sort of toPython( method, which returns a Python API version of models/estimators, allowing us to use the visualization tools in that API. This would allow for some awesome pipeline where large data is used to train models on spark, and then these models are visualized in Base Python)

  • We should allow rawPredictionCol to be set in the base Spark XGBoost Estimator object. (Currently it can only be set in classification model objects.)

    • While this makes sense as that estimator object could generate a regression model, it causes ex-facto parameter injection to take place if I want to rename the probabilities col, (e.g. for BinaryClassificationEvaluator)
  • Dealing with null values is a bit of pain, as having Float.NaN as the "missing" parameter seems to cause crashes. (Hilariously, Float.PositiveInfinity works fine, which I have been using.)

  • We need to implement early stopping, or something to that effect. Either we could implement the XGBoost object from the Scala API, or extend the XGBoostEstimator Scala object with some early stopping features.

@thesuperzapper

We should fix saveAsHadoopFile....While you can use .booster.saveModel("xx")

I think you can pass in a OutputStream which is created by FileSystem.create()....then you can work with HDFS cluster, https://github.com/dmlc/xgboost/blob/master/jvm-packages/xgboost4j/src/main/java/ml/dmlc/xgboost4j/java/Booster.java#L338

...toPython....

it's a good suggestion, but I am not sure if it really worth much effort to deal with this complexity, use file system to exchange model is good enough to me

...rawPredictionCol...

it's resolved in the undergoing refactoring work of xgboost4j-spark https://github.com/dmlc/xgboost/pull/3313

.., Dealing with null values is a bit of pain, as having Float.NaN as the "missing" parameter seems to cause crashes. (Hilariously, Float.PositiveInfinity works fine, which I have been using.)

noticed it, do you know if it happens in other API?

...early stop...

does https://github.com/dmlc/xgboost/pull/2710/files work for you?

@CodingCat sorry about the massive delay, I have a bit more time to get #3376 rebased now.

In terms of saving the API interchangeable model to a Hadoop location, I think this should be implemented in the new XGBoostClassifier/XGBoostRegressor scala objects (#3313), and the pyspark wrapper should call down to that method.

In terms of the Float.NaN issue, I find that missing values crash all spark based XGBoost-0.72 API's, (But specifying Float.PositiveInfinity as the missing value works fine, if you fill your nulls as that) it seems to be the presence of Float.NaN in training which causes the crash, rather than what you specify as a missing value.

For the early stopping #2710 addresses it by wrapping the spark estimator in a new scala object, which stops support for things like Pipelines and ParamGridBuilder. I would prefer to add an early stopping feature to the new XGBoostClassifier/XGBoostRegressor scala objects.

In terms of saving the API interchangeable model to a Hadoop location, I think this should be implemented in the new XGBoostClassifier/XGBoostRegressor scala objects (#3313), and the pyspark wrapper should call down to that method.

if you are talking about pyspark API other than python API, you can directly use MLLIB model persistence APIs to read models, as we have implemented MLWritable interface (https://github.com/dmlc/xgboost/blob/master/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostRegressor.scala#L310-L311)

added the other two things to feature requests

about NaN leading to jvm crash, I found https://issues.apache.org/jira/projects/SPARK/issues/SPARK-24760 in Spark JIRA....I think it might be relevant

@CodingCat for the saving thing, I was meaning in the case where you want to save your model that you trained in spark, and read it back into Python/R, this situation would be benefited dramatically, if I could saveToHadoop(, on the model object.

For the NaN crash, this crash happens in the base XGBoost Spark Scala API, (At least in 0.72), the fact it happens in my PySpark wrapper is a side effect of this.

@thesuperzapper with the current code, you can persist model in xgboost4j-spark with MLLIB model persistence API and read it with the coming pyspark API, then with the read model you can do anything with model.booster

I do not see a problem here....

even we implement the functionality to support HDFS path in https://github.com/dmlc/xgboost/blob/6bed54ac39f5936b62887b0e0ed5b17a57209e15/jvm-packages/xgboost4j/src/main/java/ml/dmlc/xgboost4j/java/Booster.java#L338, are you sure you can read with pure python API (not pyspark API) directly?

for NaN, yes, the link I posted is about Pandas cannot work with NaN + JVM as well

I suspect there is something wrong with NaN when JVM interacts with other programming languages, maybe it was transformed to something weird in native layer

@CodingCat I think we might not be understanding each other.

For the model saving, you have to use .booster.saveModel( if you use any other model saving you get XGBoostError: b'[16:31:29] src/gbm/gbm.cc:20: Unknown gbm type ' (See: #2480)

For the NaN, this happens even if you just use the Scala API (No python anywhere in the chain), so to fix the bug, we need to get it working in the Scala API.

@thesuperzapper

  1. what you need is booster.saveModel("a hdfs path here"), right?

  2. my question is even you have the functionality in 1, are you sure the current xgboost supports loadModel("a hdfs path here")? (this is a question)

  3. on the other side, I think we need to be clear about how we want to establish the interchangeable, you want to do spark <-> python/R, then we need to ensure the yes answer to question 2....if you want to do scala spark <-> pyspark <-> python/R(this is what I understand from you), we can do it through the already-existing MLLIB persistence API

regarding NaN...I am not talking about python, I am talking about cross language conversion, that's from scala to native

@CodingCat

  1. Yea, that would be good, or just fix the already existing .saveModelAsHadoopFile(" to create a readable model for the python/R API's.

  2. I am not sure, but I doubt that .loadModel supports hdfs. But we could implement the complement of above .readModelFromHadoopFile(" method in the spark API.

  3. I really don't know how you can use the existing mllib persistence API to get a trained model into python/R, can you explain?

  4. The NaN thing: Yea, its definitely related to some of that Scala/C interaction, as the error it throws is a C one.

  1. looks like even loadModel does not support hdfs path, you can pass in inputstream which is built via HDFS FileSystem instance

check https://github.com/dmlc/xgboost/blob/master/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostClassifier.scala#L418-L432

I really don't know how you can use the existing mllib persistence API to get a trained model into python/R, can you explain?

with the same piece of code, you can call the counterpart of XGBoostClassificationModel.load(....) in your pyspark API to read a model, then with the counterpart of https://github.com/dmlc/xgboost/blob/2200939416af2fd6b7eed334b5067bf483170f5e/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostClassifier.scala#L215 in pyspark API, you can get booster in python....this booster is identical to that in python API so you can do whatever you want in pure python API

@CodingCat I think I was forgetting that the Scala API had been rewritten, will need to alter my wrapper for it to work with 0.8.

@thesuperzapper any update on this?

@CodingCat, sorry about the delay, haven't started the rewrite yet, was partially waiting for stability in the API, and partially extremely busy. I will give this a further look this week.

Are there any doc's yet for the new API structure?

@thesuperzapper no problem,

the new API structure is very simple, it only contains the standard interface of Spark MLLIB transform and fit

all the other configurations like transform leaf, etc. is enabled by setting a column name

we have a undergoing tutorial in https://github.com/CodingCat/xgboost/pull/4

any update on this?

@CodingCat yea, the issue I keep running into is allowing support for pipeline persistence. In my initial PR, I supported this by creating my own pipeline object, but I don't like that solution as its messy and leaves way to much code to support in the future.

As of Spark 2.3, there is DefaultParamsWritable, and I think there must be a way to get that working with the default pipeline object. However it is unlikely that a pipeline written in python will be readable in the Scala API.

@thesuperzapper are you still active working on this?

@yanboliang can take over or help you if you are busy in other stuffs

@CodingCat yea, a few weeks ago I put up a development build for 0.8 in issue #1698.

I really just need people to test it and tell me what I missed, and whats not working. Additionally, I don't want to introduce the XGBoostPipeline object. If possible I would really want to use the new Python Spark 2.3 DefaultParamsWritable API, but haven't really seen any reference implementation for that yet.

The only issue serious issue I keep running into is that classification models wont load back after being saved, giving the error: TypeError: 'JavaPackage' object is not callable. However, strangely XGBoostPipelineModel works just fine with an XGBoost classification stage. This leads me to think is an issue on my end, can someone verify if reading classification models works for them?

I will update this PR (#3376) with that dev build, but its still needs some work.

I intend to use this code with an incremental optimizer which has to run on just one machine, or at least a limited number of them (e.g: Tree Parzen Estimator). This has me having to wrap XGBoostClassifier/Regressor into another Estimator that'll just randomize params according to uniform distributions. This system allows us to converge a lot faster than with GridSearch.
It's just the search space that changes, auc based cross validation is still here.
Can this solution be easily implemented hacking the Py4J wrappers that'll come out here ?

Another thing is we use multiple environments and I see that the serialization code for the model is different between Python and XGBoost, is there planned operability through this PR ?

@thesuperzapper I am using xgboost4j-spark-0.80.jar along with xgboost4j-0.80.jar, as well as the wrapper functions stored in sparkxgb.zip which is compatible with version 0.80 of xgboost. I am able to make predictions after fitting the data with a list of features and target value. I would like to now extract the information gain of each feature, but I have not managed to figure out a way to obtain these results. Would someone be able to point me in the right direction. Much appreciated!

prediction = ['prediction']

features = ['feature_1', 'feature_2',  'feature_3', 'feature_4']

vectorAssembler = VectorAssembler().setInputCols(features).setOutputCol('features')

xgboost = XGBoostClassifier(
    featuresCol='features',
    labelCol='label',
    predictionCol='prediction'
)

pipeline = Pipeline().setStages([vectorAssembler, xgboost])

model = pipeline.fit(train)

results = model.transform(test).select(features+prediction)

results.select('prediction').distinct().collect()
[Row(prediction=0.0), Row(prediction=1.0)]

I have tried accessing the below with no luck.

print(model.stages[1].nativeBooster.getFeatureScore.__doc__)

Help on class Booster in package ml.dmlc.xgboost4j.scala:

Booster implements scala.Serializable, com.esotericsoftware.kryo.KryoSerializable {
|  
|  Methods defined here:
|  
|  getFeatureScore(String) : Map
|  
|  ------------------------------------------------------------
|  Fields defined here:
|  
|  ------------------------------------------------------------
|  Internal classes defined here:
|  
}

@CodingCat has the above issue mentioned about information gain been resolved in version 1.0 that you mentioned in the other thread? Thank you in advance.

@thesuperzapper Are you still working on the wrapper?

This new PR is where we are working on it #4656

Was this page helpful?
0 / 5 - 0 ratings