Skip to content

Commit

Permalink
#1434 adding new rpc message type that is generic and can report vari…
Browse files Browse the repository at this point in the history
…ety of success and error result
  • Loading branch information
naleeha authored and keikeicheung committed Sep 5, 2024
1 parent d6d3dde commit a8eb90a
Show file tree
Hide file tree
Showing 16 changed files with 176 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,19 @@ public PersonRpcHandler(DataTable table, TableContainer tableContainer) {
registerRpc("GetPeopleWithName", (params) -> processGetPeopleNameRpcRequest(params));
}

public RpcMethodCallResult processUpdateNameRpcRequest(RpcParams params) {
public RpcFunctionResult processUpdateNameRpcRequest(RpcParams params) {
updateName(
params.namedParams().get("Id").get().toString(), //how to report error when expected param missing or fail to cast to right type
params.namedParams().get("Name").get().toString()
);
return new RpcMethodSuccess(); //how to control what viewport action to trigger?
return new RpcFunctionSuccess(); //how to control what viewport action to trigger?
}

public RpcMethodCallResult processGetPeopleNameRpcRequest(RpcParams params) {
public RpcFunctionResult processGetPeopleNameRpcRequest(RpcParams params) {
var people = getPeopleWithNameThatStartWith(
Arrays.stream(params.params()).findFirst().toString()
);
return new RpcMethodSuccess(people); //need to return result
return new RpcFunctionSuccess(people); //need to return result
}

public String[] updateName(String id, String newName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ public void should_register_java_function_as_rpc_in_default_handler() {
}

static class TestRpcService {
RpcMethodCallResult rpcFunction(RpcParams params) {
return new RpcMethodSuccess("It Works");
RpcFunctionResult rpcFunction(RpcParams params) {
return new RpcFunctionSuccess("It Works");
}
}
}
37 changes: 28 additions & 9 deletions vuu/src/main/scala/org/finos/vuu/core/CoreServerApiHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import org.finos.toolbox.time.Clock
import org.finos.vuu.api.AvailableViewPortVisualLink
import org.finos.vuu.core.table.{DataType, TableContainer, ViewPortColumnCreator}
import org.finos.vuu.net._
import org.finos.vuu.net.rpc.{RpcFunctionFailure, RpcFunctionSuccess}
import org.finos.vuu.provider.{ProviderContainer, RpcProvider}
import org.finos.vuu.viewport._

Expand Down Expand Up @@ -93,7 +94,7 @@ class CoreServerApiHandler(val viewPortContainer: ViewPortContainer,
}

override def process(msg: ViewPortEditSubmitFormRpcCall)(ctx: RequestContext): Option[ViewServerMessage] = {
Try(viewPortContainer.callRpcEditFormSubmit(msg.vpId, ctx.session)) match {
Try(viewPortContainer.callRpcEditFormSubmit(msg.vpId, ctx.session)) match {
case Success(action) =>
logger.info("Processed VP Edit Submit From RPC call" + msg)
vsMsg(ViewPortEditRpcResponse(msg.vpId, "VP_EDIT_SUBMIT_FORM_RPC", action))(ctx)
Expand Down Expand Up @@ -225,13 +226,13 @@ class CoreServerApiHandler(val viewPortContainer: ViewPortContainer,
else {
val table = tableContainer.getTable(msg.table.table) //todo need to check module? what if modules with same table name

if(table == null)
if (table == null)
errorMsg(s"No such table found with name ${msg.table.table} in module ${msg.table.module}")(ctx)
else{
else {

val viewPortDef = viewPortContainer.getViewPortDefinition(table)
val columns = viewPortDef.columns.sortBy(_.index)
val columnNames = columns.map(_.name)
val columnNames = columns.map(_.name)
val dataTypes = columns.map(col => DataType.asString(col.dataType))
vsMsg(GetTableMetaResponse(msg.table, columnNames, dataTypes, table.getTableDef.keyField))(ctx)
}
Expand Down Expand Up @@ -290,11 +291,11 @@ class CoreServerApiHandler(val viewPortContainer: ViewPortContainer,

}

def validateColumns(table: RowSource, columns: Array[String]): Unit ={
def validateColumns(table: RowSource, columns: Array[String]): Unit = {
val invalidColumns = columns
.filter(!_.contains(":")) // remove calculated columns
.filter(name => ! table.asTable.getTableDef.columns.map(_.name).contains(name))
if(invalidColumns.nonEmpty){
.filter(name => !table.asTable.getTableDef.columns.map(_.name).contains(name))
if (invalidColumns.nonEmpty) {
logger.error("Invalid columns specified in viewport request:" + invalidColumns.mkString(","))
throw new Exception("Invalid columns specified in viewport request")
}
Expand Down Expand Up @@ -330,7 +331,7 @@ class CoreServerApiHandler(val viewPortContainer: ViewPortContainer,

val groupByColumns = msg.groupBy.filter(vpColumns.getColumnForName(_).get != null).flatMap(vpColumns.getColumnForName).toList

val aggs = msg.aggregations.map(a => Aggregation(vpColumns.getColumnForName(a.column).get, a.aggType.toShort )).toList
val aggs = msg.aggregations.map(a => Aggregation(vpColumns.getColumnForName(a.column).get, a.aggType.toShort)).toList

val groupBy = new GroupBy(groupByColumns, aggs)

Expand Down Expand Up @@ -408,4 +409,22 @@ class CoreServerApiHandler(val viewPortContainer: ViewPortContainer,
viewPortContainer.closeNode(msg.vpId, msg.treeKey)
vsMsg(CloseTreeNodeSuccess(msg.vpId, msg.treeKey))(ctx)
}
}


override def process(msg: RpcRequest)(ctx: RequestContext): Option[ViewServerMessage] = {
val response = Try(viewPortContainer.handleRpcRequest(msg.context.viewPortId, msg.rpcName, msg.params)(ctx)) match {
case Success(functionResult) =>
logger.info(s"Processed VP RPC call ${ctx.requestId}" + msg)
functionResult match {
case RpcFunctionSuccess(data) =>
RpcResponseNew(rpcName = msg.rpcName, result = RpcResult(true, data, null), null)
case RpcFunctionFailure(_, error, exception) =>
RpcResponseNew(rpcName = msg.rpcName, RpcResult(false, null, errorMessage = error), null)
}
case Failure(e) =>
logger.info(s"Failed to process VP RPC call ${ctx.requestId}", e)
RpcResponseNew(rpcName = msg.rpcName, RpcResult(false, null, errorMessage = e.getMessage), null)
}
vsMsg(response)(ctx)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package org.finos.vuu.core.module.typeahead

import org.finos.vuu.core.table.{DataTable, TableContainer}
import org.finos.vuu.net.{RequestContext, RpcNames}
import org.finos.vuu.net.rpc.{DefaultRpcHandler, RpcMethodCallResult, RpcMethodSuccess, RpcParams}
import org.finos.vuu.net.rpc.{DefaultRpcHandler, RpcFunctionResult, RpcFunctionSuccess, RpcParams}
import org.finos.vuu.viewport.ViewPortColumns

class MyTypeAheadHandler(rpcRegistry: DefaultRpcHandler, tableContainer: TableContainer) {
Expand All @@ -12,18 +12,18 @@ class MyTypeAheadHandler(rpcRegistry: DefaultRpcHandler, tableContainer: TableCo
rpcRegistry.registerRpc(RpcNames.UniqueFieldValuesStartWithRpc, params => processGetUniqueFieldValuesStartWithRequest(params))
}

def processGetUniqueFieldValuesRequest(params: RpcParams): RpcMethodCallResult = {
def processGetUniqueFieldValuesRequest(params: RpcParams): RpcFunctionResult = {
val values = getUniqueFieldValues(
params.namedParams("table").toString, //how to report error when expected param missing or fail to cast to right type
params.namedParams("module").toString,
params.namedParams("column").toString,
params.viewPortColumns.get,
null //todo what to do about request context
)
new RpcMethodSuccess(values)
new RpcFunctionSuccess(values)
}

def processGetUniqueFieldValuesStartWithRequest(params: RpcParams): RpcMethodCallResult = {
def processGetUniqueFieldValuesStartWithRequest(params: RpcParams): RpcFunctionResult = {
val values = getUniqueFieldValuesStartingWith(
params.namedParams("table").toString, //how to report error when expected param missing or fail to cast to right type
params.namedParams("module").toString,
Expand All @@ -32,7 +32,7 @@ class MyTypeAheadHandler(rpcRegistry: DefaultRpcHandler, tableContainer: TableCo
params.viewPortColumns.get,
null //todo what to do about request context
)
new RpcMethodSuccess(values) //how to control what viewport action to trigger?
new RpcFunctionSuccess(values) //how to control what viewport action to trigger?
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,42 @@ package org.finos.vuu.core.module.typeahead

import com.typesafe.scalalogging.StrictLogging
import org.finos.vuu.core.table.{DataTable, TableContainer}
import org.finos.vuu.net.rpc.{DefaultRpcHandler, RpcMethodCallResult, RpcMethodSuccess, RpcParams}
import org.finos.vuu.net.rpc.{DefaultRpcHandler, RpcFunctionResult, RpcFunctionSuccess, RpcParams}
import org.finos.vuu.net.{RequestContext, RpcNames}
import org.finos.vuu.viewport.ViewPortColumns

class ViewPortTypeAheadRpcHandler(tableContainer: TableContainer) extends DefaultRpcHandler with StrictLogging {

this.registerRpc(RpcNames.UniqueFieldValuesRpc, params => processGetUniqueFieldValuesRequest(params))
this.registerRpc(RpcNames.UniqueFieldValuesRpc, params => processGetUniqueFieldValuesRequestNew(params))
this.registerRpc(RpcNames.UniqueFieldValuesStartWithRpc, params => processGetUniqueFieldValuesStartWithRequest(params))

def processGetUniqueFieldValuesRequest(params: RpcParams): RpcMethodCallResult = {

def processGetUniqueFieldValuesRequestNew(params: RpcParams): RpcFunctionResult = {

val inputParam = params.data.get.asInstanceOf[Map[String, Any]]

val values = getUniqueFieldValues(
inputParam("table").toString, //how to report error when expected param missing or fail to cast to right type
inputParam("module").toString,
inputParam("column").toString,
params.viewPortColumns.get,
null //todo what to do about request context
)
new RpcFunctionSuccess(values)
}

def processGetUniqueFieldValuesRequest(params: RpcParams): RpcFunctionResult = {
val values = getUniqueFieldValues(
params.namedParams("table").toString, //how to report error when expected param missing or fail to cast to right type
params.namedParams("module").toString,
params.namedParams("column").toString,
params.viewPortColumns.get,
null //todo what to do about request context
)
new RpcMethodSuccess(values)
new RpcFunctionSuccess(values)
}

def processGetUniqueFieldValuesStartWithRequest(params: RpcParams): RpcMethodCallResult = {
def processGetUniqueFieldValuesStartWithRequest(params: RpcParams): RpcFunctionResult = {
val values = getUniqueFieldValuesStartingWith(
params.namedParams("table").toString, //how to report error when expected param missing or fail to cast to right type
params.namedParams("module").toString,
Expand All @@ -31,7 +46,7 @@ class ViewPortTypeAheadRpcHandler(tableContainer: TableContainer) extends Defaul
params.viewPortColumns.get,
null //todo what to do about request context
)
new RpcMethodSuccess(values) //how to control what viewport action to trigger?
new RpcFunctionSuccess(values) //how to control what viewport action to trigger?
}

def getUniqueFieldValues(tableName: String, moduleName: String, column: String, viewPortColumns: ViewPortColumns, ctx: RequestContext): Array[String] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ class DefaultMessageHandler(val channel: Channel,
case req: ViewPortDeleteCellRpcCall => serverApi.process(req)(ctx)
case req: ViewPortEditSubmitFormRpcCall => serverApi.process(req)(ctx)
case req: ViewPortRpcCall => serverApi.process(req)(ctx)
case req: RpcRequest => serverApi.process(req)(ctx)
}
}

Expand Down
29 changes: 29 additions & 0 deletions vuu/src/main/scala/org/finos/vuu/net/Messages.scala
Original file line number Diff line number Diff line change
Expand Up @@ -204,3 +204,32 @@ object UpdateType {
@JsonDeserialize(using = classOf[RowUpdateDeserializer])
case class RowUpdate(vpVersion: String, viewPortId: String, vpSize: Int, rowIndex: Int, rowKey: String, updateType: String, ts: Long, selected: Int, data: Array[Any])

/***
* New api for websocket messages - work in progress
*/

case class RpcRequest(context: RpcContext, rpcName: String, params: Any) extends MessageBody
case class RpcContext(viewPortId: String)

case class RpcResponseNew(rpcName: String, result: RpcResult, action: UIAction) extends MessageBody
case class RpcResult(isSuccess: Boolean, data: Any, errorMessage: String)


//trait RpcResult {
// val isSuccess: Boolean // todo use status enums? does that work with java
//}
//
//case class RpcSuccessResult(data: Any) extends RpcResult {
// override val isSuccess: Boolean = true
//}
//case class RpcErrorResult(errorMessage: String) extends RpcResult {
// override val isSuccess: Boolean = false
//}

trait UIAction {
val actionType: String
}

case class ShowNotificationAction(notificationType: String, title: String, message: String) extends UIAction {
override val actionType: String = "SHOW_NOTIFICATION_ACTION"
}
2 changes: 2 additions & 0 deletions vuu/src/main/scala/org/finos/vuu/net/ServerApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,7 @@ trait ServerApi {

def process(msg: ViewPortAddRowRpcCall)(ctx: RequestContext): Option[ViewServerMessage]

def process(msg: RpcRequest)(ctx: RequestContext): Option[ViewServerMessage]

def disconnect(session: ClientSessionId): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,5 +80,7 @@ import org.finos.vuu.net._
new Type(value = classOf[ViewPortAddRowRpcCall], name = "VP_EDIT_ADD_ROW_RPC"),
new Type(value = classOf[ViewPortEditRpcReject], name = "VP_EDIT_RPC_REJECT"),
new Type(value = classOf[ViewPortEditRpcResponse], name = "VP_EDIT_RPC_RESPONSE"),
new Type(value = classOf[RpcRequest], name = "RPC_REQUEST"),
new Type(value = classOf[RpcResponseNew], name = "RPC_RESPONSE"),
))
trait CoreJsonSerializationMixin {}
18 changes: 11 additions & 7 deletions vuu/src/main/scala/org/finos/vuu/net/rpc/DefaultRpcHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ class DefaultRpcHandler extends RpcHandler with StrictLogging {
override def processViewPortRpcCall(methodName: String, rpcParams: RpcParams): ViewPortAction = {
val result = processRpcMethodHandler(methodName, rpcParams)
result match {
case RpcMethodSuccess(result) =>
case RpcFunctionSuccess(result) =>
result match {
case Some(value) => DisplayResultAction(value)
case None => ViewPortRpcSuccess()
}
case _: RpcMethodFailure => ViewPortRpcFailure(s"Exception occurred calling rpc $methodName")
case _: RpcFunctionFailure => ViewPortRpcFailure(s"Exception occurred calling rpc $methodName")
}
}

Expand All @@ -43,9 +43,9 @@ class DefaultRpcHandler extends RpcHandler with StrictLogging {
val namedPars = rpc.namedParams
val module = Option(msg).map(_.module).getOrElse("")

processRpcMethodHandler(method, new RpcParams(params, namedPars, None, ctx)) match {
case result: RpcMethodSuccess => Some(VsMsg(ctx.requestId, ctx.session.sessionId, ctx.token, ctx.session.user, RpcResponse(method, result.optionalResult.orNull, error = null), module))
case error: RpcMethodFailure => Some(VsMsg(ctx.requestId, ctx.session.sessionId, ctx.token, ctx.session.user, RpcResponse(rpc.method, null, Error(error.error, error.code)), module))
processRpcMethodHandler(method, new RpcParams(params, namedPars, None, None, ctx)) match {
case result: RpcFunctionSuccess => Some(VsMsg(ctx.requestId, ctx.session.sessionId, ctx.token, ctx.session.user, RpcResponse(method, result.optionalResult.orNull, error = null), module))
case error: RpcFunctionFailure => Some(VsMsg(ctx.requestId, ctx.session.sessionId, ctx.token, ctx.session.user, RpcResponse(rpc.method, null, Error(error.error, error.code)), module))
}
}

Expand All @@ -57,10 +57,14 @@ class DefaultRpcHandler extends RpcHandler with StrictLogging {
} catch {
case e: Exception =>
logger.error(s"Error processing rpc method $methodName", e)
RpcMethodFailure(1, e.getMessage, e)
RpcFunctionFailure(1, e.getMessage, e)
}
} else {
new RpcMethodFailure(s"Could not find rpcMethodHandler $methodName")
new RpcFunctionFailure(s"Could not find rpcMethodHandler $methodName")
}
}

override def processRpcRequest(rpcName: String, params: RpcParams): RpcFunctionResult =
processRpcMethodHandler(rpcName, params)

}
2 changes: 1 addition & 1 deletion vuu/src/main/scala/org/finos/vuu/net/rpc/Rpc.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.finos.vuu.net.rpc

object Rpc {
type Function = RpcParams => RpcMethodCallResult
type Function = RpcParams => RpcFunctionResult
type FunctionName = String
}
2 changes: 2 additions & 0 deletions vuu/src/main/scala/org/finos/vuu/net/rpc/RpcHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ trait RpcHandler extends StrictLogging {

val methodsAndParams: Map[String, Array[(String, Array[Type], Method)]] = this.getClass.getMethods.map(method => (method.getName, method.getGenericParameterTypes, method)).groupBy(_._1)

def processRpcRequest(rpcName: String, params: RpcParams): RpcFunctionResult = ???

def processViewPortRpcCall(methodName: String, rcpParams: RpcParams):ViewPortAction = {

val (params, ctx) = (rcpParams.params, rcpParams.ctx)
Expand Down
10 changes: 6 additions & 4 deletions vuu/src/main/scala/org/finos/vuu/net/rpc/RpcMethodHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,18 @@ import org.finos.vuu.viewport.ViewPortColumns
class RpcParams(
val params: Array[Any],
val namedParams: Map[String, Any],
val data: Option[Any],
val viewPortColumns: Option[ViewPortColumns],
val ctx: RequestContext)

trait RpcMethodCallResult {}
trait RpcFunctionResult {}

case class RpcFunctionSuccess(optionalResult: Option[Any]) extends RpcFunctionResult {
def this(result: Any) = this(Some(result))

case class RpcMethodSuccess(optionalResult: Option[Any]) extends RpcMethodCallResult {
def this(result:Any) = this(Some(result))
def this() = this(None)
}

case class RpcMethodFailure(code: Int, error: String, exception: Exception) extends RpcMethodCallResult {
case class RpcFunctionFailure(code: Int, error: String, exception: Exception) extends RpcFunctionResult {
def this(error: String) = this(1, error, null)
}
15 changes: 13 additions & 2 deletions vuu/src/main/scala/org/finos/vuu/viewport/ViewPortContainer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import org.finos.vuu.core.table.{DataTable, SessionTable, TableContainer}
import org.finos.vuu.core.tree.TreeSessionTableImpl
import org.finos.vuu.feature.EmptyViewPortKeys
import org.finos.vuu.feature.inmem.InMemTablePrimaryKeys
import org.finos.vuu.net.rpc.{EditRpcHandler, RpcParams}
import org.finos.vuu.net.rpc.{EditRpcHandler, RpcFunctionResult, RpcParams}
import org.finos.vuu.net.{ClientSessionId, FilterSpec, RequestContext, SortSpec}
import org.finos.vuu.plugin.PluginRegistry
import org.finos.vuu.provider.{Provider, ProviderContainer}
Expand Down Expand Up @@ -77,14 +77,25 @@ class ViewPortContainer(val tableContainer: TableContainer, val providerContaine
treeNodeStatesByVp.get(vpId)
}

def handleRpcRequest(viewPortId: String, rpcName: String, params: Any) (ctx: RequestContext): RpcFunctionResult = {
val viewPort = this.getViewPortById(viewPortId)

if(viewPort == null)
throw new Exception(s"No viewport $viewPortId found for RPC Call for $rpcName")

val viewPortDef = viewPort.getStructure.viewPortDef

viewPortDef.service.processRpcRequest(rpcName, new RpcParams(null, null, Some(params), Some(viewPort.getColumns), ctx))
}

def callRpcService(vpId: String, method: String, params: Array[Any], namedParams: Map[String, Any], session: ClientSessionId)(ctx: RequestContext): ViewPortAction = {
val viewPort = this.getViewPortById(vpId)

if(viewPort == null)
throw new Exception(s"No viewport $vpId found for RPC Call for $method")

val viewPortDef = viewPort.getStructure.viewPortDef
viewPortDef.service.processViewPortRpcCall(method, new RpcParams(params, namedParams, Some(viewPort.getColumns), ctx))
viewPortDef.service.processViewPortRpcCall(method, new RpcParams(params, namedParams, None, Some(viewPort.getColumns), ctx))
}

def callRpcCell(vpId: String, rpcName: String, session: ClientSessionId, rowKey: String, field: String, singleValue: Object): ViewPortAction = {
Expand Down
Loading

0 comments on commit a8eb90a

Please sign in to comment.