@ -4,12 +4,6 @@ import android.annotation.SuppressLint
import android.text.TextUtils
import android.util.Base64
import android.util.Log
import com.gitee.xuankaicat.kmnkt.socket.MqttQuality
import com.gitee.xuankaicat.kmnkt.socket.dsl.mqtt
import com.gitee.xuankaicat.kmnkt.socket.dsl.tcp
import com.gitee.xuankaicat.kmnkt.socket.dsl.udp
import com.gitee.xuankaicat.kmnkt.socket.open
import com.gitee.xuankaicat.kmnkt.socket.utils.Charset
import com.google.gson.Gson
import com.idormy.sms.forwarder.database.entity.Rule
import com.idormy.sms.forwarder.entity.MsgInfo
@ -17,7 +11,20 @@ import com.idormy.sms.forwarder.entity.setting.SocketSetting
import com.idormy.sms.forwarder.utils.SendUtils
import com.idormy.sms.forwarder.utils.SettingUtils
import com.xuexiang.xutil.app.AppUtils
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended
import org.eclipse.paho.client.mqttv3.MqttClient
import org.eclipse.paho.client.mqttv3.MqttConnectOptions
import org.eclipse.paho.client.mqttv3.MqttException
import org.eclipse.paho.client.mqttv3.MqttMessage
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
import java.io.BufferedReader
import java.io.BufferedWriter
import java.io.InputStreamReader
import java.io.OutputStreamWriter
import java.net.Socket
import java.net.URLEncoder
import java.nio.charset.Charset
import java.nio.charset.StandardCharsets
import java.text.SimpleDateFormat
import java.util.*
@ -62,113 +69,100 @@ class SocketUtils {
}
if ( setting . method == " TCP " || setting . method == " UDP " ) {
var isReceived = false
var isConnected = false
val socket = if ( setting . method == " TCP " ) {
tcp {
address = setting . address //设置ip地址
port = setting . port //设置端口号
if ( ! TextUtils . isEmpty ( setting . inCharset ) ) inCharset = Charset . forName ( setting . inCharset ) //设置输入编码
if ( ! TextUtils . isEmpty ( setting . outCharset ) ) outCharset = Charset . forName ( setting . outCharset ) //设置输出编码
}
} else {
udp {
address = setting . address //设置ip地址
port = setting . port //设置端口号
if ( ! TextUtils . isEmpty ( setting . inCharset ) ) inCharset = Charset . forName ( setting . inCharset ) //设置输入编码
if ( ! TextUtils . isEmpty ( setting . outCharset ) ) outCharset = Charset . forName ( setting . outCharset ) //设置输出编码
}
}
socket . open {
success {
Log . d ( TAG , " ${setting.method} 连接成功 " )
isConnected = true
//SendUtils.updateLogs(logId, 1, "TCP连接成功")
socket . send ( message )
socket . startReceive { str , data ->
isReceived = true
Log . d ( TAG , " str= $str ,data= $data " )
SendUtils . updateLogs ( logId , 2 , " 收到订阅消息: str= $str ,data= $data " )
SendUtils . senderLogic ( 2 , msgInfo , rule , senderIndex , msgId )
return @startReceive false
}
}
failure {
Log . d ( TAG , " ${setting.method} 连接失败 " )
val status = 0
SendUtils . updateLogs ( logId , status , " TCP连接失败 " )
SendUtils . senderLogic ( status , msgInfo , rule , senderIndex , msgId )
return @failure false //是否继续尝试连接
}
loss {
Log . d ( TAG , " ${setting.method} 连接断开 " )
return @loss true //是否尝试重连
}
}
//延时5秒关闭连接
if ( isConnected ) {
Thread . sleep ( 10000 )
socket . stopReceive ( )
// 创建套接字并连接到服务器
val socket = Socket ( setting . address , setting . port )
Log . d ( TAG , " 连接到服务器: ${setting.address} : ${setting.port} " )
try {
// 获取输入流和输出流, 设置字符集为UTF-8
val input = BufferedReader ( InputStreamReader ( socket . getInputStream ( ) , Charset . forName ( setting . inCharset ) ) )
val output = BufferedWriter ( OutputStreamWriter ( socket . getOutputStream ( ) , Charset . forName ( setting . outCharset ) ) )
// 向服务器发送数据
output . write ( message )
output . newLine ( ) // 添加换行符以便服务器使用readLine()来读取
output . flush ( )
Log . d ( TAG , " 发送到服务器的消息: $message " )
// 从服务器接收响应
val response = input . readLine ( )
Log . d ( TAG , " 从服务器接收的响应: $response " )
val status = if ( ! setting . response . isNullOrEmpty ( ) && ! response . contains ( setting . response ) ) 0 else 2
SendUtils . updateLogs ( logId , status , response )
SendUtils . senderLogic ( status , msgInfo , rule , senderIndex , msgId )
} catch ( e : Exception ) {
e . printStackTrace ( )
val status = 0
SendUtils . updateLogs ( logId , status , e . message . toString ( ) )
SendUtils . senderLogic ( status , msgInfo , rule , senderIndex , msgId )
} finally {
// 关闭套接字
socket . close ( )
if ( !is Received ) {
SendUtils . updateLogs ( logId , 0 , " 未收到订阅消息 " )
SendUtils . senderLogic ( 0 , msgInfo , rule , senderIndex , msgId )
}
Log . d ( TAG , " Disconnected from MQTT broker " )
}
return
} else if ( setting . method == " MQTT " ) {
val mqtt = mqtt {
address = setting . address //设置ip地址
port = setting . port //设置端口号
if ( ! TextUtils . isEmpty ( setting . inCharset ) ) inCharset = Charset . forName ( setting . inCharset ) //设置输入编码
if ( ! TextUtils . isEmpty ( setting . outCharset ) ) outCharset = Charset . forName ( setting . outCharset ) //设置输出编码
if ( ! TextUtils . isEmpty ( setting . username ) ) username = setting . username
if ( ! TextUtils . isEmpty ( setting . password ) ) password = setting . password
if ( ! TextUtils . isEmpty ( setting . inMessageTopic ) ) inMessageTopic = setting . inMessageTopic
if ( ! TextUtils . isEmpty ( setting . outMessageTopic ) ) outMessageTopic = setting . outMessageTopic
//自定义配置
qos = MqttQuality . ExactlyOnce // 服务质量 详见MqttQuality
if ( ! TextUtils . isEmpty ( setting . uriType ) ) uriType = setting . uriType //通信方式 默认为tcp
if ( ! TextUtils . isEmpty ( setting . clientId ) ) clientId = setting . clientId //客户端ID, 如果为空则为随机值
timeOut = 10 //设置超时时间
cleanSession = true //断开连接后是否清楚缓存,如果清除缓存则在重连后需要手动恢复订阅。
keepAliveInterval = 20 //检测连接是否中断的间隔
//行为配置
threadLock = false //是否启用线程同步锁 默认false
// MQTT 连接参数
val uriType = if ( TextUtils . isEmpty ( setting . uriType ) ) " tcp " else setting . uriType
val brokerUrl = " ${uriType} :// ${setting.address} : ${setting.port} "
if ( ! TextUtils . isEmpty ( setting . path ) ) {
brokerUrl . plus ( setting . path )
}
mqtt . open {
success {
Log . d ( TAG , " MQTT连接成功 " )
//SendUtils.updateLogs(logId, 1, "MQTT连接成功")
// 订阅并发布后等待至拿到响应消息并赋值给result
// 如果超过10秒没有收到消息则将result设为"消息响应超时", 并取消订阅topic
val response = mqtt . sendAndReceiveSync ( setting . outMessageTopic , setting . inMessageTopic , message , 10000L ) ?: " 消息响应超时 "
mqtt . close ( )
val status = if ( response == " 消息响应超时 " ) 0 else 2
SendUtils . updateLogs ( logId , status , " 收到订阅消息: $response " )
SendUtils . senderLogic ( status , msgInfo , rule , senderIndex , msgId )
return @success
}
failure {
Log . d ( TAG , " MQTT连接失败 " )
val status = 0
SendUtils . updateLogs ( logId , status , " MQTT连接失败 " )
SendUtils . senderLogic ( status , msgInfo , rule , senderIndex , msgId )
return @failure false //是否继续尝试连接
Log . d ( TAG , " MQTT brokerUrl: $brokerUrl " )
val clientId = if ( TextUtils . isEmpty ( setting . clientId ) ) UUID . randomUUID ( ) . toString ( ) else setting . clientId
val mqttClient = MqttClient ( brokerUrl , clientId , MemoryPersistence ( ) )
try {
val options = MqttConnectOptions ( )
if ( ! TextUtils . isEmpty ( setting . username ) ) {
options . userName = setting . username
}
loss {
Log . d ( TAG , " MQTT失去连接 " )
return @loss true //是否尝试重连
if ( ! TextUtils . isEmpty ( setting . password ) ) {
options . password = setting . password . toCharArray ( )
}
}
options . isCleanSession = true
mqttClient . connect ( options )
Log . d ( TAG , " Connected to MQTT broker: ${mqttClient.serverURI} " )
mqttClient . subscribe ( setting . inMessageTopic )
Log . d ( TAG , " Subscribed to topic: $setting .inMessageTopic " )
val outMessage = message . toByteArray ( Charset . forName ( setting . outCharset ) )
val mqttMessage = MqttMessage ( outMessage )
mqttMessage . qos = 0 // 设置消息质量服务等级
//异步发布消息
mqttClient . publish ( setting . outMessageTopic , mqttMessage )
Log . d ( TAG , " Published message to topic: $setting .outMessageTopic " )
mqttClient . setCallback ( object : MqttCallbackExtended {
override fun connectionLost ( cause : Throwable ? ) {
val response = " Connection to MQTT broker lost: ${cause?.message} "
Log . d ( TAG , response )
SendUtils . updateLogs ( logId , 0 , response )
SendUtils . senderLogic ( 0 , msgInfo , rule , senderIndex , msgId )
}
}
override fun messageArrived ( topic : String ? , inMessage : MqttMessage ? ) {
val payload = inMessage ?. payload ?. toString ( Charset . forName ( setting . inCharset ) )
Log . d ( TAG , " Received message on topic $topic : $payload " )
val status = if ( ! setting . response . isNullOrEmpty ( ) && ! payload ?. contains ( setting . response ) !! ) 0 else 2
SendUtils . updateLogs ( logId , status , payload . toString ( ) )
SendUtils . senderLogic ( status , msgInfo , rule , senderIndex , msgId )
}
override fun deliveryComplete ( token : IMqttDeliveryToken ? ) {
Log . d ( TAG , " deliveryComplete " )
SendUtils . updateLogs ( logId , 2 , " deliveryComplete " )
SendUtils . senderLogic ( 2 , msgInfo , rule , senderIndex , msgId )
}
override fun connectComplete ( reconnect : Boolean , serverURI : String ? ) {
Log . d ( TAG , " connectComplete " )
}
} )
} catch ( e : MqttException ) {
Log . d ( TAG , " An error occurred: ${e.message} " )
} finally {
mqttClient . disconnect ( )
Log . d ( TAG , " Disconnected from MQTT broker " )
}
}
}
//JSON需要转义的字符