-
Notifications
You must be signed in to change notification settings - Fork 389
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Recall pure socket #883
base: main
Are you sure you want to change the base?
Recall pure socket #883
Conversation
WalkthroughThis update primarily focuses on refactoring the WebSocket handling in the application. It introduces a new Changes
Uplevel your code reviews with Entelligence.AI ProEntelligence.AI ProIf you like this project, please support us by purchasing the Pro version. The Pro version has advanced context, superior noise reduction and several proprietary improvements compared to the open source version. Moreover, Entelligence.AI Pro is free for open source projects. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Files selected (1)
- app/lib/utils/pure_socket.dart (1)
Files not summarized due to errors (1)
- app/lib/utils/pure_socket.dart (diff tokens exceeds limit)
Review comments generated (8)
- Review: 6
- LGTM: 2
Tips
Chat with Entelligence.AI Bot (@Entelligence.AI
)
- Reply on review comments left by this bot to ask follow-up questions. A review comment is a comment on a diff or a file.
- Invite the bot into a review comment chain by tagging
@Entelligence.AI
in a reply.
Code suggestions
- The bot may make code suggestions, but please review them carefully before committing since the line number ranges may be misaligned.
- You can edit the comment made by the bot and manually tweak the suggestion if it is slightly off.
Pausing incremental reviews
- Add
@Entelligence.AI: ignore
anywhere in the PR description to pause further reviews from the bot.
enum PureSocketStatus { notConnected, connecting, connected, disconnected } | ||
|
||
abstract class IPureSocketListener { | ||
void onMessage(dynamic message); | ||
void onClosed(); | ||
void onError(Object err, StackTrace trace); | ||
|
||
void onInternetConnectionFailed() {} | ||
|
||
void onMaxRetriesReach() {} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
app/lib/utils/pure_socket.dart
Outdated
|
||
abstract class IPureSocket { | ||
Future<bool> connect(); | ||
void disconnect(); | ||
void send(dynamic message); | ||
|
||
void onInternetSatusChanged(InternetStatus status); | ||
|
||
void onMessage(dynamic message); | ||
void onClosed(); | ||
void onError(Object err, StackTrace trace); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
app/lib/utils/pure_socket.dart
Outdated
class PureCore { | ||
late InternetConnection internetConnection; | ||
|
||
factory PureCore() => _instance; | ||
|
||
/// The singleton instance of [PureCore]. | ||
static final _instance = PureCore.createInstance(); | ||
|
||
PureCore.createInstance() { | ||
internetConnection = InternetConnection.createInstance( | ||
customCheckOptions: [ | ||
InternetCheckOption( | ||
uri: Uri.parse(Env.apiBaseUrl!), | ||
timeout: const Duration( | ||
seconds: 30, | ||
)), | ||
], | ||
); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
app/lib/utils/pure_socket.dart
Outdated
return true; | ||
} | ||
|
||
@override | ||
void disconnect() { | ||
_status = PureSocketStatus.disconnected; | ||
_cleanUp(); | ||
} | ||
|
||
Future _cleanUp() async { | ||
_internetLostDelayTimer?.cancel(); | ||
_internetStatusListener?.cancel(); | ||
await _channel?.sink.close(status.goingAway); | ||
} | ||
|
||
@override | ||
void onClosed() { | ||
_status = PureSocketStatus.disconnected; | ||
debugPrint("Socket closed"); | ||
_listener?.onClosed(); | ||
} | ||
|
||
@override | ||
void onError(Object err, StackTrace trace) { | ||
_status = PureSocketStatus.disconnected; | ||
print("Error: ${err}"); | ||
debugPrintStack(stackTrace: trace); | ||
|
||
_listener?.onError(err, trace); | ||
|
||
CrashReporting.reportHandledCrash(err, trace, level: NonFatalExceptionLevel.error); | ||
} | ||
|
||
@override | ||
void onMessage(dynamic message) { | ||
debugPrint("[Socket] Message $message"); | ||
_listener?.onMessage(message); | ||
} | ||
|
||
@override | ||
void send(message) { | ||
_channel?.sink.add(message); | ||
} | ||
|
||
void _reconnect() async { | ||
const int initialBackoffTimeMs = 1000; // 1 second | ||
const double multiplier = 1.5; | ||
const int maxRetries = 7; | ||
|
||
if (_status == PureSocketStatus.connecting || _status == PureSocketStatus.connected) { | ||
debugPrint("[Socket] Can not reconnect, because socket is $_status"); | ||
return; | ||
} | ||
|
||
await _cleanUp(); | ||
|
||
var ok = await _connect(); | ||
if (ok) { | ||
return; | ||
} | ||
|
||
// retry | ||
int waitInMilliseconds = pow(multiplier, _retries).toInt() * initialBackoffTimeMs; | ||
await Future.delayed(Duration(milliseconds: waitInMilliseconds)); | ||
_retries++; | ||
if (_retries >= maxRetries) { | ||
debugPrint("[Socket] Reach max retries $maxRetries"); | ||
_listener?.onMaxRetriesReach(); | ||
return; | ||
} | ||
_reconnect(); | ||
} | ||
|
||
@override | ||
void onInternetSatusChanged(InternetStatus status) { | ||
_internetStatus = status; | ||
switch (status) { | ||
case InternetStatus.connected: | ||
if (_status == PureSocketStatus.connected || _status == PureSocketStatus.connecting) { | ||
return; | ||
} | ||
_reconnect(); | ||
break; | ||
case InternetStatus.disconnected: | ||
var that = this; | ||
_internetLostDelayTimer?.cancel(); | ||
_internetLostDelayTimer = Timer(const Duration(seconds: 60), () { | ||
if (_internetStatus != InternetStatus.disconnected) { | ||
return; | ||
} | ||
|
||
that.disconnect(); | ||
_listener?.onInternetConnectionFailed(); | ||
}); | ||
|
||
break; | ||
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PureSocket
class implements the IPureSocket
interface and handles socket connections, messages, errors, and disconnections. It also handles internet connection status changes and retries when the connection fails. This class is well implemented, but there are some areas where error handling could be improved:
- In the
connect
and_connect
methods (lines 97-136), there's no error handling if the connection fails. Consider adding a try-catch block around the connection logic to handle any exceptions that might occur. - In the
disconnect
method (lines 139-142), there's no error handling if closing the sink fails. Consider adding a try-catch block around thesink.close
call to handle any exceptions that might occur. - In the
send
method (lines 175-177), there's no error handling if adding a message to the sink fails. Consider adding a try-catch block around thesink.add
call to handle any exceptions that might occur.
abstract interface class ITransctipSegmentSocketServiceListener { | ||
void onMessageEventReceived(ServerMessageEvent event); | ||
void onSegmentReceived(List<TranscriptSegment> segments); | ||
void onError(Object err); | ||
void onClosed(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
app/lib/utils/pure_socket.dart
Outdated
} | ||
|
||
void unsubscribe(Object context) { | ||
if (_listeners.containsKey(context.hashCode)) { | ||
_listeners.remove(context.hashCode); | ||
} | ||
} | ||
|
||
void start() { | ||
_socket.connect(); | ||
} | ||
|
||
void stop() { | ||
_socket.disconnect(); | ||
_listeners.clear(); | ||
} | ||
|
||
@override | ||
void onClosed() { | ||
_listeners.forEach((k, v) { | ||
v.onClosed(); | ||
}); | ||
} | ||
|
||
@override | ||
void onError(Object err, StackTrace trace) { | ||
_listeners.forEach((k, v) { | ||
v.onError(err); | ||
}); | ||
} | ||
|
||
@override | ||
void onMessage(event) { | ||
if (event == 'ping') return; | ||
|
||
// Decode json | ||
dynamic jsonEvent; | ||
try { | ||
jsonEvent = jsonDecode(event); | ||
} on FormatException catch (e) { | ||
debugPrint(e.toString()); | ||
} | ||
if (jsonEvent == null) { | ||
debugPrint("Can not decode message event json $event"); | ||
return; | ||
} | ||
|
||
// Transcript segments | ||
if (jsonEvent is List) { | ||
var segments = jsonEvent; | ||
if (segments.isNotEmpty) { | ||
return; | ||
} | ||
_listeners.forEach((k, v) { | ||
v.onSegmentReceived(segments.map((e) => TranscriptSegment.fromJson(e)).toList()); | ||
}); | ||
return; | ||
} | ||
|
||
debugPrint(event); | ||
|
||
// Message event | ||
if (jsonEvent.containsKey("type")) { | ||
var event = ServerMessageEvent.fromJson(jsonEvent); | ||
_listeners.forEach((k, v) { | ||
v.onMessageEventReceived(event); | ||
}); | ||
return; | ||
} | ||
|
||
debugPrint(event.toString()); | ||
} | ||
|
||
@override | ||
void onInternetConnectionFailed() { | ||
debugPrint("onInternetConnectionFailed"); | ||
|
||
// Send notification | ||
NotificationService.instance.clearNotification(3); | ||
NotificationService.instance.createNotification( | ||
notificationId: 3, | ||
title: 'Internet Connection Lost', | ||
body: 'Your device is offline. Transcription is paused until connection is restored.', | ||
); | ||
} | ||
|
||
@override | ||
void onMaxRetriesReach() { | ||
debugPrint("onMaxRetriesReach"); | ||
|
||
// Send notification | ||
NotificationService.instance.clearNotification(2); | ||
NotificationService.instance.createNotification( | ||
notificationId: 2, | ||
title: 'Connection Issue 🚨', | ||
body: 'Unable to connect to the transcript service.' | ||
' Please restart the app or contact support if the problem persists.', | ||
); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The TranscripSegmentSocketService
class implements the IPureSocketListener
interface and handles socket messages, errors, and disconnections. It also manages subscriptions to message events. This class is well implemented, but there are some areas where error handling could be improved:
- In the
onMessage
method (lines 319-359), there's no error handling if decoding the JSON fails or if creating aServerMessageEvent
from the JSON fails. Consider adding try-catch blocks around these operations to handle any exceptions that might occur. - In the
start
andstop
methods (lines 296-303), there's no error handling if connecting or disconnecting the socket fails. Consider adding try-catch blocks around these operations to handle any exceptions that might occur.
Also, there's a typo in the name of the class. It should be TranscriptSegmentSocketService
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Files selected (6)
- app/lib/backend/schema/bt_device.dart (1)
- app/lib/main.dart (1)
- app/lib/providers/capture_provider.dart (16)
- app/lib/services/services.dart (3)
- app/lib/services/sockets.dart (1)
- app/lib/utils/pure_socket.dart (1)
Files not summarized due to errors (2)
- app/lib/providers/capture_provider.dart (diff tokens exceeds limit)
- app/lib/utils/pure_socket.dart (diff tokens exceeds limit)
Review comments generated (33)
- Review: 17
- LGTM: 16
Tips
Chat with Entelligence.AI Bot (@Entelligence.AI
)
- Reply on review comments left by this bot to ask follow-up questions. A review comment is a comment on a diff or a file.
- Invite the bot into a review comment chain by tagging
@Entelligence.AI
in a reply.
Code suggestions
- The bot may make code suggestions, but please review them carefully before committing since the line number ranges may be misaligned.
- You can edit the comment made by the bot and manually tweak the suggestion if it is slightly off.
Pausing incremental reviews
- Add
@Entelligence.AI: ignore
anywhere in the PR description to pause further reviews from the bot.
@@ -168,7 +168,7 @@ class _MyAppState extends State<MyApp> with WidgetsBindingObserver { | |||
ChangeNotifierProxyProvider3<MemoryProvider, MessageProvider, WebSocketProvider, CaptureProvider>( | |||
create: (context) => CaptureProvider(), | |||
update: (BuildContext context, memory, message, wsProvider, CaptureProvider? previous) => | |||
(previous?..updateProviderInstances(memory, message, wsProvider)) ?? CaptureProvider(), | |||
(previous?..updateProviderInstances(memory, message)) ?? CaptureProvider(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The wsProvider
argument has been removed from the updateProviderInstances
method call. This could potentially cause issues if the CaptureProvider
class's updateProviderInstances
method still expects three arguments. Please ensure that the method signature for updateProviderInstances
in the CaptureProvider
class has been updated to match this change.
- (previous?..updateProviderInstances(memory, message, wsProvider)) ?? CaptureProvider(),
+ (previous?..updateProviderInstances(memory, message)) ?? CaptureProvider(),
import 'package:friend_private/utils/pure_socket.dart'; | ||
import 'package:permission_handler/permission_handler.dart'; | ||
import 'package:uuid/uuid.dart'; | ||
|
||
class CaptureProvider extends ChangeNotifier with OpenGlassMixin, MessageNotifierMixin { | ||
class CaptureProvider extends ChangeNotifier | ||
with OpenGlassMixin, MessageNotifierMixin | ||
implements ITransctipSegmentSocketServiceListener { | ||
MemoryProvider? memoryProvider; | ||
MessageProvider? messageProvider; | ||
WebSocketProvider? webSocketProvider; | ||
TranscripSegmentSocketService? _socket; | ||
|
||
void updateProviderInstances(MemoryProvider? mp, MessageProvider? p, WebSocketProvider? wsProvider) { | ||
void updateProviderInstances(MemoryProvider? mp, MessageProvider? p) { | ||
memoryProvider = mp; | ||
messageProvider = p; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The removal of WebSocketProvider
and the introduction of TranscripSegmentSocketService
is a significant change. Ensure that all functionalities previously handled by WebSocketProvider
are now correctly managed by TranscripSegmentSocketService
. Also, make sure to update any other parts of the codebase that might be using WebSocketProvider
.
print('closeCode: $closeCode'); | ||
// connection was closed, either on resetState, or by backend, or by some other reason. | ||
// setState(() {}); | ||
}, | ||
onConnectionError: (err) { | ||
print('inside onConnectionError'); | ||
print('err: $err'); | ||
// connection was okay, but then failed. | ||
notifyListeners(); | ||
}, | ||
onMessageEventReceived: (ServerMessageEvent event) { | ||
if (event.type == MessageEventType.newMemoryCreating) { | ||
_onMemoryCreating(); | ||
return; | ||
} | ||
|
||
if (event.type == MessageEventType.newMemoryCreated) { | ||
_onMemoryCreated(event); | ||
return; | ||
} | ||
debugPrint('is ws null: ${_socket == null}'); | ||
|
||
if (event.type == MessageEventType.newMemoryCreateFailed) { | ||
_onMemoryCreateFailed(); | ||
return; | ||
} | ||
|
||
if (event.type == MessageEventType.newProcessingMemoryCreated) { | ||
if (event.processingMemoryId == null) { | ||
print("New processing memory created message event is invalid"); | ||
return; | ||
} | ||
_onProcessingMemoryCreated(event.processingMemoryId!); | ||
return; | ||
} | ||
|
||
if (event.type == MessageEventType.memoryPostProcessingSuccess) { | ||
if (event.memoryId == null) { | ||
print("Post proccess message event is invalid"); | ||
return; | ||
} | ||
_onMemoryPostProcessSuccess(event.memoryId!); | ||
return; | ||
} | ||
|
||
if (event.type == MessageEventType.memoryPostProcessingFailed) { | ||
if (event.memoryId == null) { | ||
print("Post proccess message event is invalid"); | ||
return; | ||
} | ||
_onMemoryPostProcessFailed(event.memoryId!); | ||
return; | ||
} | ||
}, | ||
onMessageReceived: (List<TranscriptSegment> newSegments) { | ||
if (newSegments.isEmpty) return; | ||
|
||
if (segments.isEmpty) { | ||
debugPrint('newSegments: ${newSegments.last}'); | ||
// TODO: small bug -> when memory A creates, and memory B starts, memory B will clean a lot more seconds than available, | ||
// losing from the audio the first part of the recording. All other parts are fine. | ||
FlutterForegroundTask.sendDataToTask(jsonEncode({'location': true})); | ||
var currentSeconds = (audioStorage?.frames.length ?? 0) ~/ 100; | ||
var removeUpToSecond = newSegments[0].start.toInt(); | ||
audioStorage?.removeFramesRange(fromSecond: 0, toSecond: min(max(currentSeconds - 5, 0), removeUpToSecond)); | ||
firstStreamReceivedAt = DateTime.now(); | ||
} | ||
// TODO: thinh, socket | ||
_socket = await ServiceManager.instance().socket.memory(codec: codec, sampleRate: sampleRate); | ||
if (_socket == null) { | ||
throw Exception("Can not create new memory socket"); | ||
} | ||
|
||
streamStartedAtSecond ??= newSegments[0].start; | ||
TranscriptSegment.combineSegments( | ||
segments, | ||
newSegments, | ||
toRemoveSeconds: streamStartedAtSecond ?? 0, | ||
toAddSeconds: secondsMissedOnReconnect ?? 0, | ||
); | ||
triggerTranscriptSegmentReceivedEvents(newSegments, conversationId, sendMessageToChat: (v) { | ||
messageProvider?.addMessage(v); | ||
}); | ||
|
||
debugPrint('Memory creation timer restarted'); | ||
_memoryCreationTimer?.cancel(); | ||
_memoryCreationTimer = | ||
Timer(const Duration(seconds: quietSecondsForMemoryCreation), () => _createPhotoCharacteristicMemory()); | ||
setHasTranscripts(true); | ||
notifyListeners(); | ||
}, | ||
); | ||
// Ok | ||
_socket?.subscribe(this, this); | ||
if (segments.isNotEmpty) { | ||
// means that it was a reconnection, so we need to reset | ||
streamStartedAtSecond = null; | ||
secondsMissedOnReconnect = (DateTime.now().difference(firstStreamReceivedAt!).inSeconds); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This new implementation of initiateWebsocket
seems to lack error handling for when the socket connection fails or closes unexpectedly. In the old implementation, there were callbacks such as onConnectionFailed
, onConnectionClosed
, and onConnectionError
which handled these scenarios. Consider adding similar error handling mechanisms in your new implementation.
// Socket handling | ||
|
||
@override | ||
void onClosed() { | ||
debugPrint('socket is closed'); | ||
} | ||
|
||
@override | ||
void onError(Object err) { | ||
debugPrint('err: $err'); | ||
notifyListeners(); | ||
} | ||
|
||
@override | ||
void onMessageEventReceived(ServerMessageEvent event) { | ||
if (event.type == MessageEventType.newMemoryCreating) { | ||
_onMemoryCreating(); | ||
return; | ||
} | ||
|
||
if (event.type == MessageEventType.newMemoryCreated) { | ||
_onMemoryCreated(event); | ||
return; | ||
} | ||
|
||
if (event.type == MessageEventType.newMemoryCreateFailed) { | ||
_onMemoryCreateFailed(); | ||
return; | ||
} | ||
|
||
if (event.type == MessageEventType.newProcessingMemoryCreated) { | ||
if (event.processingMemoryId == null) { | ||
debugPrint("New processing memory created message event is invalid"); | ||
return; | ||
} | ||
_onProcessingMemoryCreated(event.processingMemoryId!); | ||
return; | ||
} | ||
|
||
if (event.type == MessageEventType.memoryPostProcessingSuccess) { | ||
if (event.memoryId == null) { | ||
debugPrint("Post proccess message event is invalid"); | ||
return; | ||
} | ||
_onMemoryPostProcessSuccess(event.memoryId!); | ||
return; | ||
} | ||
|
||
if (event.type == MessageEventType.memoryPostProcessingFailed) { | ||
if (event.memoryId == null) { | ||
debugPrint("Post proccess message event is invalid"); | ||
return; | ||
} | ||
_onMemoryPostProcessFailed(event.memoryId!); | ||
return; | ||
} | ||
} | ||
|
||
@override | ||
void onSegmentReceived(List<TranscriptSegment> newSegments) { | ||
if (newSegments.isEmpty) return; | ||
|
||
if (segments.isEmpty) { | ||
debugPrint('newSegments: ${newSegments.last}'); | ||
// TODO: small bug -> when memory A creates, and memory B starts, memory B will clean a lot more seconds than available, | ||
// losing from the audio the first part of the recording. All other parts are fine. | ||
FlutterForegroundTask.sendDataToTask(jsonEncode({'location': true})); | ||
var currentSeconds = (audioStorage?.frames.length ?? 0) ~/ 100; | ||
var removeUpToSecond = newSegments[0].start.toInt(); | ||
audioStorage?.removeFramesRange(fromSecond: 0, toSecond: min(max(currentSeconds - 5, 0), removeUpToSecond)); | ||
firstStreamReceivedAt = DateTime.now(); | ||
} | ||
|
||
streamStartedAtSecond ??= newSegments[0].start; | ||
TranscriptSegment.combineSegments( | ||
segments, | ||
newSegments, | ||
toRemoveSeconds: streamStartedAtSecond ?? 0, | ||
toAddSeconds: secondsMissedOnReconnect ?? 0, | ||
); | ||
triggerTranscriptSegmentReceivedEvents(newSegments, conversationId, sendMessageToChat: (v) { | ||
messageProvider?.addMessage(v); | ||
}); | ||
|
||
debugPrint('Memory creation timer restarted'); | ||
_memoryCreationTimer?.cancel(); | ||
_memoryCreationTimer = | ||
Timer(const Duration(seconds: quietSecondsForMemoryCreation), () => _createPhotoCharacteristicMemory()); | ||
setHasTranscripts(true); | ||
notifyListeners(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new implementation of the ITransctipSegmentSocketServiceListener
interface seems to be correct. However, ensure that all the methods are implemented correctly and handle all edge cases. For instance, in the onMessageEventReceived
method, you're checking if event.memoryId
is null before calling _onMemoryPostProcessSuccess
and _onMemoryPostProcessFailed
. Make sure this check is necessary and that event.memoryId
can indeed be null in some cases. If it's not supposed to be null, this could indicate an issue elsewhere in your code where the event is being created.
app/lib/services/sockets.dart
Outdated
} | ||
|
||
// Warn: Should use a better solution to prevent race conditions | ||
bool memoryMutex = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using a boolean flag as a mutex can lead to race conditions if multiple threads access the memory
function simultaneously. Consider using a proper synchronization mechanism like a Semaphore
or Mutex
from the dart:isolate
package.
- bool memoryMutex = false;
+ final _memoryMutex = Mutex();
const int initialBackoffTimeMs = 1000; // 1 second | ||
const double multiplier = 1.5; | ||
const int maxRetries = 7; | ||
|
||
if (_status == PureSocketStatus.connecting || _status == PureSocketStatus.connected) { | ||
debugPrint("[Socket] Can not reconnect, because socket is $_status"); | ||
return; | ||
} | ||
|
||
await _cleanUp(); | ||
|
||
var ok = await _connect(); | ||
if (ok) { | ||
return; | ||
} | ||
|
||
// retry | ||
int waitInMilliseconds = pow(multiplier, _retries).toInt() * initialBackoffTimeMs; | ||
await Future.delayed(Duration(milliseconds: waitInMilliseconds)); | ||
_retries++; | ||
if (_retries >= maxRetries) { | ||
debugPrint("[Socket] Reach max retries $maxRetries"); | ||
_listener?.onMaxRetriesReach(); | ||
return; | ||
} | ||
_reconnect(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The _reconnect
method does not handle exceptions that might be thrown by _cleanUp
or _connect
. This could lead to unhandled exceptions in your application. Consider adding a try-catch block to handle potential exceptions.
+ try {
const int initialBackoffTimeMs = 1000; // 1 second
const double multiplier = 1.5;
const int maxRetries = 7;
if (_status == PureSocketStatus.connecting || _status == PureSocketStatus.connected) {
debugPrint("[Socket] Can not reconnect, because socket is $_status");
return;
}
await _cleanUp();
var ok = await _connect();
if (ok) {
return;
}
// retry
int waitInMilliseconds = pow(multiplier, _retries).toInt() * initialBackoffTimeMs;
await Future.delayed(Duration(milliseconds: waitInMilliseconds));
_retries++;
if (_retries >= maxRetries) {
debugPrint("[Socket] Reach max retries $maxRetries");
_listener?.onMaxRetriesReach();
return;
}
_reconnect();
+ } catch (e) {
+ print('Error reconnecting to the socket: $e');
+ }
Future start() async { | ||
bool ok = await _socket.connect(); | ||
if (!ok) { | ||
debugPrint("Can not connect to websocket"); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The start
method does not handle exceptions that might be thrown by _socket.connect
. This could lead to unhandled exceptions in your application. Consider adding a try-catch block to handle potential exceptions.
+ try {
bool ok = await _socket.connect();
if (!ok) {
debugPrint("Can not connect to websocket");
}
+ } catch (e) {
+ print('Error starting the socket: $e');
+ }
app/lib/utils/pure_socket.dart
Outdated
Future stop({String? reason}) async { | ||
await _socket.disconnect(); | ||
_listeners.clear(); | ||
|
||
if (reason != null) { | ||
debugPrint(reason); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The stop
method does not handle exceptions that might be thrown by _socket.disconnect
. This could lead to unhandled exceptions in your application. Consider adding a try-catch block to handle potential exceptions.
+ try {
await _socket.disconnect();
_listeners.clear();
if (reason != null) {
debugPrint(reason);
}
+ } catch (e) {
+ print('Error stopping the socket: $e');
+ }
Future send(dynamic message) async { | ||
_socket.send(message); | ||
return; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The send
method does not handle exceptions that might be thrown by _socket.send
. This could lead to unhandled exceptions in your application. Consider adding a try-catch block to handle potential exceptions.
+ try {
_socket.send(message);
return;
+ } catch (e) {
+ print('Error sending message through the socket: $e');
+ }
app/lib/utils/pure_socket.dart
Outdated
@override | ||
void onMessage(event) { | ||
if (event == 'ping') return; | ||
|
||
// Decode json | ||
dynamic jsonEvent; | ||
try { | ||
jsonEvent = jsonDecode(event); | ||
} on FormatException catch (e) { | ||
debugPrint(e.toString()); | ||
} | ||
if (jsonEvent == null) { | ||
debugPrint("Can not decode message event json $event"); | ||
return; | ||
} | ||
|
||
// Transcript segments | ||
if (jsonEvent is List) { | ||
var segments = jsonEvent; | ||
if (segments.isNotEmpty) { | ||
return; | ||
} | ||
_listeners.forEach((k, v) { | ||
v.onSegmentReceived(segments.map((e) => TranscriptSegment.fromJson(e)).toList()); | ||
}); | ||
return; | ||
} | ||
|
||
debugPrint(event); | ||
|
||
// Message event | ||
if (jsonEvent.containsKey("type")) { | ||
var event = ServerMessageEvent.fromJson(jsonEvent); | ||
_listeners.forEach((k, v) { | ||
v.onMessageEventReceived(event); | ||
}); | ||
return; | ||
} | ||
|
||
debugPrint(event.toString()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The onMessage
method does not handle exceptions that might be thrown by jsonDecode
, TranscriptSegment.fromJson
, or ServerMessageEvent.fromJson
. This could lead to unhandled exceptions in your application. Consider adding a try-catch block to handle potential exceptions.
+ try {
if (event == 'ping') return;
// Decode json
dynamic jsonEvent;
try {
jsonEvent = jsonDecode(event);
} on FormatException catch (e) {
debugPrint(e.toString());
}
if (jsonEvent == null) {
debugPrint("Can not decode message event json $event");
return;
}
// Transcript segments
if (jsonEvent is List) {
var segments = jsonEvent;
if (segments.isNotEmpty) {
return;
}
_listeners.forEach((k, v) {
v.onSegmentReceived(segments.map((e) => TranscriptSegment.fromJson(e)).toList());
});
return;
}
debugPrint(event);
// Message event
if (jsonEvent.containsKey("type")) {
var event = ServerMessageEvent.fromJson(jsonEvent);
_listeners.forEach((k, v) {
v.onMessageEventReceived(event);
});
return;
}
debugPrint(event.toString());
+ } catch (e) {
+ print('Error handling message from the socket: $e');
+ }
47c60f0
to
c0f87f1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Files selected (14)
- app/lib/backend/schema/bt_device.dart (1)
- app/lib/main.dart (1)
- app/lib/pages/capture/_page.dart (2)
- app/lib/pages/capture/widgets/widgets.dart (1)
- app/lib/pages/home/page.dart (1)
- app/lib/pages/memories/widgets/capture.dart (2)
- app/lib/pages/memories/widgets/processing_capture.dart (3)
- app/lib/providers/capture_provider.dart (18)
- app/lib/providers/device_provider.dart (6)
- app/lib/providers/speech_profile_provider.dart (2)
- app/lib/services/device_connections.dart (3)
- app/lib/services/services.dart (3)
- app/lib/services/sockets.dart (1)
- app/lib/utils/pure_socket.dart (1)
Files not summarized due to errors (3)
- app/lib/pages/capture/_page.dart (diff tokens exceeds limit)
- app/lib/providers/capture_provider.dart (diff tokens exceeds limit)
- app/lib/utils/pure_socket.dart (diff tokens exceeds limit)
Files skipped from review due to trivial changes (1)
- app/lib/pages/capture/widgets/widgets.dart
Review comments generated (26)
- Review: 21
- LGTM: 5
Tips
Chat with Entelligence.AI Bot (@Entelligence.AI
)
- Reply on review comments left by this bot to ask follow-up questions. A review comment is a comment on a diff or a file.
- Invite the bot into a review comment chain by tagging
@Entelligence.AI
in a reply.
Code suggestions
- The bot may make code suggestions, but please review them carefully before committing since the line number ranges may be misaligned.
- You can edit the comment made by the bot and manually tweak the suggestion if it is slightly off.
Pausing incremental reviews
- Add
@Entelligence.AI: ignore
anywhere in the PR description to pause further reviews from the bot.
app/lib/pages/home/page.dart
Outdated
// TODO: thinh, socket speech profile | ||
// context.read<DeviceProvider>().restartWebSocket(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The WebSocket restart logic has been commented out. If this is intentional and the functionality is being handled elsewhere, consider removing these lines entirely to avoid confusion. If the functionality is not handled elsewhere, this could lead to issues with the WebSocket connection not being properly restarted when necessary.
- // TODO: thinh, socket speech profile
- // context.read<DeviceProvider>().restartWebSocket();
app/lib/main.dart
Outdated
(previous?..updateProviderInstances(memory, message)) ?? CaptureProvider(), | ||
), | ||
ChangeNotifierProxyProvider2<CaptureProvider, WebSocketProvider, DeviceProvider>( | ||
create: (context) => DeviceProvider(), | ||
update: (BuildContext context, captureProvider, wsProvider, DeviceProvider? previous) => | ||
(previous?..setProviders(captureProvider, wsProvider)) ?? DeviceProvider(), | ||
(previous?..setProviders(captureProvider)) ?? DeviceProvider(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ChangeNotifierProxyProvider3
and ChangeNotifierProxyProvider2
functions have been modified to remove the wsProvider
parameter from their update
methods. This change could potentially lead to issues if the wsProvider
was being used in the updateProviderInstances
or setProviders
methods. Please ensure that these methods do not require the wsProvider
for their functionality, or refactor them accordingly.
- (previous?..updateProviderInstances(memory, message, wsProvider)) ?? CaptureProvider(),
+ (previous?..updateProviderInstances(memory, message)) ?? CaptureProvider(),
- (previous?..setProviders(captureProvider, wsProvider)) ?? DeviceProvider(),
+ (previous?..setProviders(captureProvider)) ?? DeviceProvider(),
// TODO: thinh, socket check why we need to close socket provider here, disable temporary | ||
//context.read<WebSocketProvider>().closeWebSocketWithoutReconnect('Recording with phone mic'); | ||
await provider.changeAudioRecordProfile(BleAudioCodec.pcm16, 16000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The call to close the WebSocket without reconnecting has been commented out and replaced with a call to changeAudioRecordProfile
. Make sure this doesn't introduce any unexpected behavior, especially if the WebSocket was being closed for a specific reason (e.g., to free up resources or prevent data leaks).
- context.read<WebSocketProvider>().closeWebSocketWithoutReconnect('Recording with phone mic');
- await provider.initiateWebsocket(BleAudioCodec.pcm16, 16000);
+ //context.read<WebSocketProvider>().closeWebSocketWithoutReconnect('Recording with phone mic');
+ await provider.changeAudioRecordProfile(BleAudioCodec.pcm16, 16000);
if (mounted) { | ||
context.read<CaptureProvider>().setGeolocation(Geolocation( | ||
latitude: data['latitude'], | ||
longitude: data['longitude'], | ||
accuracy: data['accuracy'], | ||
altitude: data['altitude'], | ||
time: DateTime.parse(data['time']), | ||
)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The check for mounted
before setting the geolocation is a good practice to prevent calling setState
when the widget is no longer in the widget tree. This can help avoid unnecessary exceptions.
- context.read<CaptureProvider>().setGeolocation(Geolocation(
- latitude: data['latitude'],
- longitude: data['longitude'],
- accuracy: data['accuracy'],
- altitude: data['altitude'],
- time: DateTime.parse(data['time']),
- ));
+ if (mounted) {
+ context.read<CaptureProvider>().setGeolocation(Geolocation(
+ latitude: data['latitude'],
+ longitude: data['longitude'],
+ accuracy: data['accuracy'],
+ altitude: data['altitude'],
+ time: DateTime.parse(data['time']),
+ ));
+ }
@@ -139,8 +141,7 @@ class LiteCaptureWidgetState extends State<LiteCaptureWidget> | |||
context.read<WebSocketProvider>().closeWebSocketWithoutReconnect('Firmware change detected'); | |||
var connectedDevice = deviceProvider.connectedDevice; | |||
var codec = await _getAudioCodec(connectedDevice!.id); | |||
context.read<CaptureProvider>().resetState(restartBytesProcessing: true); | |||
context.read<CaptureProvider>().initiateWebsocket(codec); | |||
await context.read<CaptureProvider>().changeAudioRecordProfile(codec); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The refactoring of resetting state and initiating the websocket into a single method changeAudioRecordProfile
improves code readability and maintainability by encapsulating related operations into one function.
- context.read<CaptureProvider>().resetState(restartBytesProcessing: true);
- context.read<CaptureProvider>().initiateWebsocket(codec);
+ await context.read<CaptureProvider>().changeAudioRecordProfile(codec);
setConnectedDevice(device); | ||
setIsConnected(true); | ||
updateConnectingStatus(false); | ||
await captureProvider?.resetState(restartBytesProcessing: true, btDevice: connectedDevice); | ||
await captureProvider?.streamDeviceRecording(restartBytesProcessing: true, btDevice: connectedDevice!); | ||
// initiateBleBatteryListener(); | ||
// The device is still disconnected for some reason | ||
if (connectedDevice != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: This review was outside of the patch, so it was mapped to the patch with the greatest overlap. Original lines [208-213]
The call to resetState
has been replaced with streamDeviceRecording
. This change seems to indicate a shift in how the application handles starting device recording. Ensure that this new approach covers all necessary aspects of starting the device recording.
- await captureProvider?.resetState(restartBytesProcessing: true, btDevice: connectedDevice);
+ await captureProvider?.streamDeviceRecording(restartBytesProcessing: true, btDevice: connectedDevice!);
app/lib/services/sockets.dart
Outdated
import 'package:flutter/material.dart'; | ||
import 'package:friend_private/backend/schema/bt_device.dart'; | ||
import 'package:friend_private/utils/pure_socket.dart'; | ||
|
||
abstract class ISocketService { | ||
void start(); | ||
void stop(); | ||
|
||
Future<TranscripSegmentSocketService?> memory( | ||
{required BleAudioCodec codec, required int sampleRate, bool force = false}); | ||
TranscripSegmentSocketService speechProfile(); | ||
} | ||
|
||
abstract interface class ISocketServiceSubsciption {} | ||
|
||
class SocketServicePool extends ISocketService { | ||
TranscripSegmentSocketService? _memory; | ||
TranscripSegmentSocketService? _speechProfile; | ||
|
||
@override | ||
void start() { | ||
// TODO: implement start | ||
} | ||
|
||
@override | ||
void stop() async { | ||
await _memory?.stop(); | ||
await _speechProfile?.stop(); | ||
} | ||
|
||
// Warn: Should use a better solution to prevent race conditions | ||
bool memoryMutex = false; | ||
@override | ||
Future<TranscripSegmentSocketService?> memory( | ||
{required BleAudioCodec codec, required int sampleRate, bool force = false}) async { | ||
while (memoryMutex) { | ||
await Future.delayed(const Duration(milliseconds: 50)); | ||
} | ||
memoryMutex = true; | ||
|
||
debugPrint("socket memory > $codec $sampleRate $force"); | ||
|
||
try { | ||
if (!force && | ||
_memory?.codec == codec && | ||
_memory?.sampleRate == sampleRate && | ||
_memory?.state == SocketServiceState.connected) { | ||
return _memory; | ||
} | ||
|
||
// new socket | ||
await _memory?.stop(); | ||
|
||
_memory = MemoryTranscripSegmentSocketService.create(sampleRate, codec); | ||
await _memory?.start(); | ||
if (_memory?.state != SocketServiceState.connected) { | ||
return null; | ||
} | ||
|
||
return _memory; | ||
} finally { | ||
memoryMutex = false; | ||
} | ||
|
||
return null; | ||
} | ||
|
||
@override | ||
TranscripSegmentSocketService speechProfile() { | ||
// TODO: implement speechProfile | ||
throw UnimplementedError(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The SocketServicePool
class has a memoryMutex
boolean flag to prevent race conditions when accessing the memory socket. However, this is not a reliable way to handle concurrency issues and can lead to unexpected behavior if multiple threads try to access the memory socket at the same time.
Instead of using a simple boolean flag, consider using a more robust synchronization mechanism like a Mutex or Semaphore from the dart:isolate
package. This will ensure that only one thread can access the critical section of code at a time, effectively preventing race conditions.
Here's an example of how you could use a Semaphore
:
+ import 'dart:isolate';
class SocketServicePool extends ISocketService {
TranscripSegmentSocketService? _memory;
TranscripSegmentSocketService? _speechProfile;
+ Semaphore _memorySemaphore = Semaphore(1);
@override
Future<TranscripSegmentSocketService?> memory(
{required BleAudioCodec codec, required int sampleRate, bool force = false}) async {
- while (memoryMutex) {
- await Future.delayed(const Duration(milliseconds: 50));
- }
- memoryMutex = true;
+ await _memorySemaphore.acquire();
debugPrint("socket memory > $codec $sampleRate $force");
try {
// ... rest of your code ...
} finally {
- memoryMutex = false;
+ _memorySemaphore.release();
}
return null;
}
}
This change ensures that the memory
method is thread-safe and prevents potential race conditions.
_connectionStateChangedCallback = onConnectionStateChanged; | ||
_connectionStateSubscription = bleDevice.connectionState.listen((BluetoothConnectionState state) async { | ||
_onBleConnectionStateChanged(state, onConnectionStateChanged); | ||
_onBleConnectionStateChanged(state); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The callback function _connectionStateChangedCallback
is now stored as a class member and used later in the code. This change seems to be more maintainable and less error-prone than passing the callback around. However, it's important to ensure that this callback is always set before it's used.
- _connectionStateSubscription = bleDevice.connectionState.listen((BluetoothConnectionState state) async {
- _onBleConnectionStateChanged(state, onConnectionStateChanged);
- });
+ _connectionStateChangedCallback = onConnectionStateChanged;
+ _connectionStateSubscription = bleDevice.connectionState.listen((BluetoothConnectionState state) async {
+ _onBleConnectionStateChanged(state);
+ });
void _onBleConnectionStateChanged(BluetoothConnectionState state) async { | ||
if (state == BluetoothConnectionState.disconnected && _connectionState == DeviceConnectionState.connected) { | ||
_connectionState = DeviceConnectionState.disconnected; | ||
await disconnect(callback: callback); | ||
await disconnect(); | ||
return; | ||
} | ||
|
||
if (state == BluetoothConnectionState.connected && _connectionState == DeviceConnectionState.disconnected) { | ||
_connectionState = DeviceConnectionState.connected; | ||
if (callback != null) { | ||
callback(device.id, _connectionState); | ||
if (_connectionStateChangedCallback != null) { | ||
_connectionStateChangedCallback!(device.id, _connectionState); | ||
} | ||
} | ||
} | ||
|
||
Future<void> disconnect({Function(String deviceId, DeviceConnectionState state)? callback}) async { | ||
Future<void> disconnect() async { | ||
_connectionState = DeviceConnectionState.disconnected; | ||
if (callback != null) { | ||
callback(device.id, _connectionState); | ||
if (_connectionStateChangedCallback != null) { | ||
_connectionStateChangedCallback!(device.id, _connectionState); | ||
_connectionStateChangedCallback = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changes here seem to improve the readability of the code by removing the need to pass the callback function around. Instead, the callback is stored as a class member and used when needed. The disconnect method no longer accepts a callback parameter directly but uses the stored callback instead. This approach reduces the complexity of the code and makes it easier to understand.
- void _onBleConnectionStateChanged(
- BluetoothConnectionState state, Function(String deviceId, DeviceConnectionState state)? callback) async {
- if (state == BluetoothConnectionState.disconnected && _connectionState == DeviceConnectionState.connected) {
- _connectionState = DeviceConnectionState.disconnected;
- await disconnect(callback: callback);
- return;
- }
-
- if (state == BluetoothConnectionState.connected && _connectionState == DeviceConnectionState.disconnected) {
- _connectionState = DeviceConnectionState.connected;
- if (callback != null) {
- callback(device.id, _connectionState);
- }
- }
- }
-
- Future<void> disconnect({Function(String deviceId, DeviceConnectionState state)? callback}) async {
- _connectionState = DeviceConnectionState.disconnected;
- if (callback != null) {
- callback(device.id, _connectionState);
- }
- await bleDevice.disconnect();
- _connectionStateSubscription.cancel();
+ void _onBleConnectionStateChanged(BluetoothConnectionState state) async {
+ if (state == BluetoothConnectionState.disconnected && _connectionState == DeviceConnectionState.connected) {
+ _connectionState = DeviceConnectionState.disconnected;
+ await disconnect();
+ return;
+ }
+
+ if (state == BluetoothConnectionState.connected && _connectionState == DeviceConnectionState.disconnected) {
+ _connectionState = DeviceConnectionState.connected;
+ if (_connectionStateChangedCallback != null) {
+ _connectionStateChangedCallback!(device.id, _connectionState);
+ }
+ }
+ }
+
+ Future<void> disconnect() async {
+ _connectionState = DeviceConnectionState.disconnected;
+ if (_connectionStateChangedCallback != null) {
+ _connectionStateChangedCallback!(device.id, _connectionState);
+ _connectionStateChangedCallback = null;
+ }
+ await bleDevice.disconnect();
+ _connectionStateSubscription.cancel();
app/lib/utils/pure_socket.dart
Outdated
} | ||
|
||
Future stop({String? reason}) async { | ||
await _socket.disconnect(); | ||
_listeners.clear(); | ||
|
||
if (reason != null) { | ||
debugPrint(reason); | ||
} | ||
} | ||
|
||
Future send(dynamic message) async { | ||
_socket.send(message); | ||
return; | ||
} | ||
|
||
@override | ||
void onClosed() { | ||
_listeners.forEach((k, v) { | ||
v.onClosed(); | ||
}); | ||
} | ||
|
||
@override | ||
void onError(Object err, StackTrace trace) { | ||
_listeners.forEach((k, v) { | ||
v.onError(err); | ||
}); | ||
} | ||
|
||
@override | ||
void onMessage(event) { | ||
debugPrint("[TranscriptSegmentService] onMessage ${event}"); | ||
if (event == 'ping') return; | ||
|
||
// Decode json | ||
dynamic jsonEvent; | ||
try { | ||
jsonEvent = jsonDecode(event); | ||
} on FormatException catch (e) { | ||
debugPrint(e.toString()); | ||
} | ||
if (jsonEvent == null) { | ||
debugPrint("Can not decode message event json $event"); | ||
return; | ||
} | ||
|
||
// Transcript segments | ||
if (jsonEvent is List) { | ||
var segments = jsonEvent; | ||
if (segments.isEmpty) { | ||
return; | ||
} | ||
_listeners.forEach((k, v) { | ||
v.onSegmentReceived(segments.map((e) => TranscriptSegment.fromJson(e)).toList()); | ||
}); | ||
return; | ||
} | ||
|
||
debugPrint(event); | ||
|
||
// Message event | ||
if (jsonEvent.containsKey("type")) { | ||
var event = ServerMessageEvent.fromJson(jsonEvent); | ||
_listeners.forEach((k, v) { | ||
v.onMessageEventReceived(event); | ||
}); | ||
return; | ||
} | ||
|
||
debugPrint(event.toString()); | ||
} | ||
|
||
@override | ||
void onInternetConnectionFailed() { | ||
debugPrint("onInternetConnectionFailed"); | ||
|
||
// Send notification | ||
NotificationService.instance.clearNotification(3); | ||
NotificationService.instance.createNotification( | ||
notificationId: 3, | ||
title: 'Internet Connection Lost', | ||
body: 'Your device is offline. Transcription is paused until connection is restored.', | ||
); | ||
} | ||
|
||
@override | ||
void onMaxRetriesReach() { | ||
debugPrint("onMaxRetriesReach"); | ||
|
||
// Send notification | ||
NotificationService.instance.clearNotification(2); | ||
NotificationService.instance.createNotification( | ||
notificationId: 2, | ||
title: 'Connection Issue 🚨', | ||
body: 'Unable to connect to the transcript service.' | ||
' Please restart the app or contact support if the problem persists.', | ||
); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PureSocket
class has a _reconnect
method that uses exponential backoff to reconnect when the connection is lost. However, it doesn't reset the _retries
counter after a successful reconnection. This could lead to reaching the maximum number of retries more quickly in case of multiple disconnections.
+ Future<bool> _connect() async {
+ if (_status == PureSocketStatus.connecting || _status == PureSocketStatus.connected) {
+ return false;
+ }
+
+ _channel = IOWebSocketChannel.connect(
+ url,
+ pingInterval: const Duration(seconds: 10),
+ connectTimeout: const Duration(seconds: 30),
+ );
+ if (_channel?.ready == null) {
+ return false;
+ }
+
+ _status = PureSocketStatus.connecting;
+ await _channel?.ready;
+ _status = PureSocketStatus.connected;
+ _retries = 0; // Reset retries counter after successful connection
+
+ final that = this;
+
+ _channel?.stream.listen(
+ (message) {
+ that.onMessage(message);
+ },
+ onError: (err, trace) {
+ that.onError(err, trace);
+ },
+ onDone: () {
+ that.onClosed();
+ },
+ cancelOnError: true,
+ );
+
+ return true;
+ }
Also, the TranscripSegmentSocketService
class has a subscribe
method that allows listeners to subscribe to events. However, it doesn't check if the listener is already subscribed before adding it to the _listeners
map. This could lead to unexpected behavior if a listener subscribes multiple times.
+ void subscribe(Object context, ITransctipSegmentSocketServiceListener listener) {
+ if (_listeners.containsKey(context.hashCode)) {
+ throw Exception('Listener is already subscribed');
+ }
+ _listeners.putIfAbsent(context.hashCode, () => listener);
+ }
Lastly, the TranscripSegmentSocketService
class has a stop
method that clears all listeners. However, it doesn't check if the socket is connected before trying to disconnect. This could lead to an exception if the socket is not connected.
+ Future stop({String? reason}) async {
+ if (_socket.status != PureSocketStatus.connected) {
+ throw Exception('Socket is not connected');
+ }
+ await _socket.disconnect();
+ _listeners.clear();
+
+ if (reason != null) {
+ debugPrint(reason);
+ }
+ }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Files selected (2)
- app/lib/providers/capture_provider.dart (20)
- app/lib/providers/device_provider.dart (6)
Files not summarized due to errors (1)
- app/lib/providers/capture_provider.dart (diff tokens exceeds limit)
Review comments generated (19)
- Review: 12
- LGTM: 7
Tips
Chat with Entelligence.AI Bot (@Entelligence.AI
)
- Reply on review comments left by this bot to ask follow-up questions. A review comment is a comment on a diff or a file.
- Invite the bot into a review comment chain by tagging
@Entelligence.AI
in a reply.
Code suggestions
- The bot may make code suggestions, but please review them carefully before committing since the line number ranges may be misaligned.
- You can edit the comment made by the bot and manually tweak the suggestion if it is slightly off.
Pausing incremental reviews
- Add
@Entelligence.AI: ignore
anywhere in the PR description to pause further reviews from the bot.
void setProviders(CaptureProvider provider) { | ||
captureProvider = provider; | ||
webSocketProvider = wsProvider; | ||
notifyListeners(); | ||
} | ||
|
||
void setConnectedDevice(BTDeviceStruct? device) { | ||
connectedDevice = device; | ||
print('setConnectedDevice: $device'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The setProviders
method has been updated to only accept a CaptureProvider
. This change seems to be in line with the removal of the WebSocketProvider
dependency. However, it's important to ensure that all calls to this method have been updated accordingly to pass only one argument.
- void setProviders(CaptureProvider provider, WebSocketProvider wsProvider) {
+ void setProviders(CaptureProvider provider) {
if (isConnected) { | ||
await initiateBleBatteryListener(); | ||
} | ||
await captureProvider?.resetState(restartBytesProcessing: true, btDevice: connectedDevice); | ||
// if (captureProvider?.webSocketConnected == false) { | ||
// restartWebSocket(); | ||
// } | ||
|
||
notifyListeners(); | ||
} | ||
|
||
Future restartWebSocket() async { | ||
debugPrint('restartWebSocket'); | ||
|
||
await webSocketProvider?.closeWebSocketWithoutReconnect('Restarting WebSocket'); | ||
if (connectedDevice == null) { | ||
return; | ||
} | ||
await captureProvider?.resetState(restartBytesProcessing: true); | ||
notifyListeners(); | ||
} | ||
|
||
void updateConnectingStatus(bool value) { | ||
isConnecting = value; | ||
notifyListeners(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The restartWebSocket
method has been removed and its functionality appears to have been integrated into other methods. It's crucial to verify that the new implementation correctly handles all scenarios previously covered by restartWebSocket
.
- Future restartWebSocket() async {
- debugPrint('restartWebSocket');
-
- await webSocketProvider?.closeWebSocketWithoutReconnect('Restarting WebSocket');
- if (connectedDevice == null) {
- return;
- }
- await captureProvider?.resetState(restartBytesProcessing: true);
- notifyListeners();
- }
await captureProvider?.stopStreamDeviceRecording(); | ||
captureProvider?.setAudioBytesConnected(false); | ||
print('after resetState inside initiateConnectionListener'); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The resetState
call has been replaced with stopStreamDeviceRecording
. Ensure that this new method covers all necessary operations previously performed by resetState
.
- await captureProvider?.resetState(restartBytesProcessing: false);
+ await captureProvider?.stopStreamDeviceRecording();
setConnectedDevice(device); | ||
setIsConnected(true); | ||
updateConnectingStatus(false); | ||
await captureProvider?.resetState(restartBytesProcessing: true, btDevice: connectedDevice); | ||
await captureProvider?.streamDeviceRecording(restartBytesProcessing: true, device: connectedDevice); | ||
// initiateBleBatteryListener(); | ||
// The device is still disconnected for some reason | ||
if (connectedDevice != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: This review was outside of the patch, so it was mapped to the patch with the greatest overlap. Original lines [208-213]
The resetState
call has been replaced with streamDeviceRecording
. Make sure that this new method includes all necessary operations previously performed by resetState
.
- await captureProvider?.resetState(restartBytesProcessing: true, btDevice: connectedDevice);
+ await captureProvider?.streamDeviceRecording(restartBytesProcessing: true, device: connectedDevice);
void _updateRecordingDevice(BTDeviceStruct? device) { | ||
debugPrint('connected device changed from ${_recordingDevice?.id} to ${device?.id}'); | ||
_recordingDevice = device; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method name has been changed from updateConnectedDevice
to _updateRecordingDevice
, and the variable connectedDevice
has been renamed to _recordingDevice
. This change is consistent with the rest of the changes in this PR, where the term "connected device" is being replaced with "recording device". It's important to ensure that this renaming is consistently applied throughout the codebase to avoid confusion.
} | ||
|
||
Future _cleanNew() async { | ||
_clean(); | ||
|
||
// Create new socket session | ||
// Warn: should have a better solution to keep the socket alived | ||
await webSocketProvider?.closeWebSocketWithoutReconnect('reset new memory session'); | ||
await initiateWebsocket(); | ||
debugPrint("_cleanNew"); | ||
await _initiateWebsocket(force: true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_onMemoryCreating(); | ||
return; | ||
} | ||
debugPrint("changeAudioRecordProfile"); | ||
await _resetState(restartBytesProcessing: true); | ||
await _initiateWebsocket(audioCodec: audioCodec, sampleRate: sampleRate); | ||
} | ||
|
||
if (event.type == MessageEventType.newMemoryCreated) { | ||
_onMemoryCreated(event); | ||
return; | ||
} | ||
Future<void> _initiateWebsocket({ | ||
BleAudioCodec? audioCodec, | ||
int? sampleRate, | ||
bool force = false, | ||
}) async { | ||
debugPrint('initiateWebsocket in capture_provider'); | ||
|
||
if (event.type == MessageEventType.newMemoryCreateFailed) { | ||
_onMemoryCreateFailed(); | ||
return; | ||
} | ||
BleAudioCodec codec = audioCodec ?? SharedPreferencesUtil().deviceCodec; | ||
sampleRate ??= (codec == BleAudioCodec.opus ? 16000 : 8000); | ||
|
||
if (event.type == MessageEventType.newProcessingMemoryCreated) { | ||
if (event.processingMemoryId == null) { | ||
print("New processing memory created message event is invalid"); | ||
return; | ||
} | ||
_onProcessingMemoryCreated(event.processingMemoryId!); | ||
return; | ||
} | ||
debugPrint('is ws null: ${_socket == null}'); | ||
|
||
if (event.type == MessageEventType.memoryPostProcessingSuccess) { | ||
if (event.memoryId == null) { | ||
print("Post proccess message event is invalid"); | ||
return; | ||
} | ||
_onMemoryPostProcessSuccess(event.memoryId!); | ||
return; | ||
} | ||
|
||
if (event.type == MessageEventType.memoryPostProcessingFailed) { | ||
if (event.memoryId == null) { | ||
print("Post proccess message event is invalid"); | ||
return; | ||
} | ||
_onMemoryPostProcessFailed(event.memoryId!); | ||
return; | ||
} | ||
}, | ||
onMessageReceived: (List<TranscriptSegment> newSegments) { | ||
if (newSegments.isEmpty) return; | ||
|
||
if (segments.isEmpty) { | ||
debugPrint('newSegments: ${newSegments.last}'); | ||
// TODO: small bug -> when memory A creates, and memory B starts, memory B will clean a lot more seconds than available, | ||
// losing from the audio the first part of the recording. All other parts are fine. | ||
FlutterForegroundTask.sendDataToTask(jsonEncode({'location': true})); | ||
var currentSeconds = (audioStorage?.frames.length ?? 0) ~/ 100; | ||
var removeUpToSecond = newSegments[0].start.toInt(); | ||
audioStorage?.removeFramesRange(fromSecond: 0, toSecond: min(max(currentSeconds - 5, 0), removeUpToSecond)); | ||
firstStreamReceivedAt = DateTime.now(); | ||
} | ||
// TODO: thinh, socket | ||
_socket = await ServiceManager.instance().socket.memory(codec: codec, sampleRate: sampleRate, force: force); | ||
if (_socket == null) { | ||
throw Exception("Can not create new memory socket"); | ||
} | ||
|
||
streamStartedAtSecond ??= newSegments[0].start; | ||
TranscriptSegment.combineSegments( | ||
segments, | ||
newSegments, | ||
toRemoveSeconds: streamStartedAtSecond ?? 0, | ||
toAddSeconds: secondsMissedOnReconnect ?? 0, | ||
); | ||
triggerTranscriptSegmentReceivedEvents(newSegments, conversationId, sendMessageToChat: (v) { | ||
messageProvider?.addMessage(v); | ||
}); | ||
|
||
debugPrint('Memory creation timer restarted'); | ||
_memoryCreationTimer?.cancel(); | ||
_memoryCreationTimer = | ||
Timer(const Duration(seconds: quietSecondsForMemoryCreation), () => _createPhotoCharacteristicMemory()); | ||
setHasTranscripts(true); | ||
notifyListeners(); | ||
}, | ||
); | ||
// Ok | ||
_socket?.subscribe(this, this); | ||
if (segments.isNotEmpty) { | ||
// means that it was a reconnection, so we need to reset | ||
streamStartedAtSecond = null; | ||
secondsMissedOnReconnect = (DateTime.now().difference(firstStreamReceivedAt!).inSeconds); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The WebSocket initialization logic has been significantly refactored. The new implementation appears to be simpler and more straightforward than the previous one. However, it's important to ensure that all necessary error handling and event handling logic is still present in the new implementation.
writeToStorage(_recordingDevice!.id, storageUtil.fileNum, command); | ||
} | ||
|
||
Future clearFileFromDevice(int fileNum) async { | ||
storageUtil.fileNum = fileNum; | ||
int command = 1; | ||
writeToStorage(connectedDevice!.id, storageUtil.fileNum, command); | ||
writeToStorage(_recordingDevice!.id, storageUtil.fileNum, command); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method writeToStorage
is now called with _recordingDevice!.id
instead of connectedDevice!.id
. This change is consistent with the rest of the changes in this PR, where the term "connected device" is being replaced with "recording device". It's important to ensure that this renaming is consistently applied throughout the codebase to avoid confusion.
// TODO: thinh, socket, check why we need reset for speech profile here | ||
await _socket?.stop(reason: 'reset for speech profile'); | ||
setAudioBytesConnected(false); | ||
notifyListeners(); | ||
} | ||
|
||
Future<void> resetState({ | ||
Future<void> _resetState({ | ||
bool restartBytesProcessing = true, | ||
bool isFromSpeechProfile = false, | ||
BTDeviceStruct? btDevice, | ||
}) async { | ||
if (resetStateAlreadyCalled) { | ||
debugPrint('resetState already called'); | ||
return; | ||
} | ||
setResetStateAlreadyCalled(true); | ||
debugPrint('resetState: restartBytesProcessing=$restartBytesProcessing, isFromSpeechProfile=$isFromSpeechProfile'); | ||
debugPrint('resetState: restartBytesProcessing=$restartBytesProcessing'); | ||
|
||
_cleanupCurrentState(); | ||
await startOpenGlass(); | ||
if (!isFromSpeechProfile) { | ||
await _handleMemoryCreation(restartBytesProcessing); | ||
} | ||
|
||
bool codecChanged = await _checkCodecChange(); | ||
await _handleMemoryCreation(restartBytesProcessing); | ||
|
||
if (restartBytesProcessing || codecChanged) { | ||
await _manageWebSocketConnection(codecChanged, isFromSpeechProfile); | ||
} | ||
await _ensureSocketConnection(force: true); | ||
|
||
await initiateFriendAudioStreaming(isFromSpeechProfile); | ||
await startOpenGlass(); | ||
await _initiateFriendAudioStreaming(); | ||
// TODO: Commenting this for now as DevKit 2 is not yet used in production |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reset state logic has been significantly refactored. The new implementation appears to be simpler and more straightforward than the previous one. However, it's important to ensure that all necessary error handling and event handling logic is still present in the new implementation.
if (_recordingDevice != null) { | ||
BleAudioCodec newCodec = await _getAudioCodec(_recordingDevice!.id); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method _getAudioCodec
is now called with _recordingDevice!.id
instead of connectedDevice!.id
. This change is consistent with the rest of the changes in this PR, where the term "connected device" is being replaced with "recording device". It's important to ensure that this renaming is consistently applied throughout the codebase to avoid confusion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Files selected (10)
- app/lib/main.dart (1)
- app/lib/pages/capture/widgets/widgets.dart (1)
- app/lib/pages/home/page.dart (3)
- app/lib/pages/memories/widgets/capture.dart (3)
- app/lib/pages/memories/widgets/processing_capture.dart (3)
- app/lib/pages/onboarding/speech_profile_widget.dart (1)
- app/lib/pages/speech_profile/page.dart (1)
- app/lib/providers/speech_profile_provider.dart (12)
- app/lib/providers/websocket_provider.dart (1)
- app/lib/services/sockets.dart (1)
Review comments generated (32)
- Review: 26
- LGTM: 6
Tips
Chat with Entelligence.AI Bot (@Entelligence.AI
)
- Reply on review comments left by this bot to ask follow-up questions. A review comment is a comment on a diff or a file.
- Invite the bot into a review comment chain by tagging
@Entelligence.AI
in a reply.
Code suggestions
- The bot may make code suggestions, but please review them carefully before committing since the line number ranges may be misaligned.
- You can edit the comment made by the bot and manually tweak the suggestion if it is slightly off.
Pausing incremental reviews
- Add
@Entelligence.AI: ignore
anywhere in the PR description to pause further reviews from the bot.
@@ -207,7 +207,7 @@ class _SpeechProfileWidgetState extends State<SpeechProfileWidget> with TickerPr | |||
await provider.initialise(true); | |||
provider.forceCompletionTimer = | |||
Timer(Duration(seconds: provider.maxDuration), () async { | |||
provider.finalize(true); | |||
provider.finalize(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The finalize
method call in the new hunk is missing a boolean parameter that was present in the old hunk. This could potentially cause issues if the finalize
method still expects this parameter. Please ensure that the method signature of finalize
has been updated accordingly, or pass the required parameter.
- provider.finalize();
+ provider.finalize(true);
// TODO: thinh, socket change settings | ||
// context.read<DeviceProvider>().restartWebSocket(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The commented out line context.read<DeviceProvider>().restartWebSocket();
seems to be removed as part of the refactoring process. However, there is a TODO comment left here which indicates that the functionality for socket change settings needs to be implemented. This could potentially lead to issues if the socket settings are not properly updated when the user comes back. Please ensure that this functionality is added before merging the PR.
- // context.read<DeviceProvider>().restartWebSocket();
+ // Implement socket change settings here
@@ -325,7 +325,7 @@ class _SpeechProfilePageState extends State<SpeechProfilePage> with TickerProvid | |||
// 1.5 minutes seems reasonable | |||
provider.forceCompletionTimer = | |||
Timer(Duration(seconds: provider.maxDuration), () { | |||
provider.finalize(false); | |||
provider.finalize(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
var connectedDevice = deviceProvider.connectedDevice; | ||
var codec = await _getAudioCodec(connectedDevice!.id); | ||
context.read<CaptureProvider>().resetState(restartBytesProcessing: true); | ||
context.read<CaptureProvider>().initiateWebsocket(codec); | ||
await context.read<CaptureProvider>().changeAudioRecordProfile(codec); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new code seems to be missing the functionality of closing the WebSocket without reconnecting when a firmware change is detected. This was present in the old code but has been removed in the new one. If this functionality is still required, it should be reintroduced.
+ context.read<SocketServicePool>().closeSocketWithoutReconnect('Firmware change detected');
var connectedDevice = deviceProvider.connectedDevice;
var codec = await _getAudioCodec(connectedDevice!.id);
await context.read<CaptureProvider>().changeAudioRecordProfile(codec);
Please ensure that SocketServicePool
has a method closeSocketWithoutReconnect
similar to the old WebSocketProvider
. If not, you may need to implement it.
@@ -7,7 +7,6 @@ import 'package:friend_private/pages/memory_capturing/page.dart'; | |||
import 'package:friend_private/providers/capture_provider.dart'; | |||
import 'package:friend_private/providers/connectivity_provider.dart'; | |||
import 'package:friend_private/providers/device_provider.dart'; | |||
import 'package:friend_private/providers/websocket_provider.dart'; | |||
import 'package:friend_private/utils/analytics/mixpanel.dart'; | |||
import 'package:friend_private/utils/enums.dart'; | |||
import 'package:friend_private/utils/other/temp.dart'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The import statement for websocket_provider.dart
has been removed, which is consistent with the PR's goal of removing the dependency on WebSocketProvider. Ensure that all references to this provider have been properly refactored or removed in the rest of the codebase.
- import 'package:friend_private/providers/websocket_provider.dart';
@@ -296,8 +280,7 @@ class SpeechProfileProvider extends ChangeNotifier with MessageNotifierMixin imp | |||
percentageCompleted = 0; | |||
uploadingProfile = false; | |||
profileCompleted = false; | |||
await webSocketProvider?.closeWebSocketWithoutReconnect('closing'); | |||
await captureProvider?.resetState(restartBytesProcessing: true, isFromSpeechProfile: true); | |||
await _socket?.stop(reason: 'closing'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The resetState
method has been updated to stop the new socket service instead of closing the old WebSocketProvider. This change is consistent with the removal of the WebSocketProvider dependency. However, the call to resetState
on the capture provider has been removed. If this functionality is necessary, it should be implemented elsewhere in the class.
@@ -355,6 +338,7 @@ class SpeechProfileProvider extends ChangeNotifier with MessageNotifierMixin imp | |||
connectionStateListener?.cancel(); | |||
_bleBytesStream?.cancel(); | |||
forceCompletionTimer?.cancel(); | |||
_socket?.unsubscribe(this); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
void start() { | ||
// TODO: implement start | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The start
method in the SocketServicePool
class is not implemented. This could lead to unexpected behavior if it's called. If there's no specific implementation needed, consider removing this method or adding a comment explaining why it's empty.
- void start() {
- // TODO: implement start
- }
+ void start() {
+ // Implementation not required for this class
+ }
// Warn: Should use a better solution to prevent race conditions | ||
bool mutex = false; | ||
Future<TranscripSegmentSocketService?> socket( | ||
{required BleAudioCodec codec, required int sampleRate, bool force = false}) async { | ||
while (mutex) { | ||
await Future.delayed(const Duration(milliseconds: 50)); | ||
} | ||
mutex = true; | ||
|
||
try { | ||
if (!force && | ||
_socket?.codec == codec && | ||
_socket?.sampleRate == sampleRate && | ||
_socket?.state == SocketServiceState.connected) { | ||
return _socket; | ||
} | ||
|
||
// new socket | ||
await _socket?.stop(); | ||
|
||
_socket = MemoryTranscripSegmentSocketService.create(sampleRate, codec); | ||
await _socket?.start(); | ||
if (_socket?.state != SocketServiceState.connected) { | ||
return null; | ||
} | ||
|
||
return _socket; | ||
} finally { | ||
mutex = false; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The use of a boolean flag as a mutex can lead to race conditions and is not recommended. Dart provides the Isolate
package which can be used to prevent race conditions by running code in separate threads. Alternatively, you can use a Semaphore
from the dart:async
package.
- bool mutex = false;
- Future<TranscripSegmentSocketService?> socket(
- {required BleAudioCodec codec, required int sampleRate, bool force = false}) async {
- while (mutex) {
- await Future.delayed(const Duration(milliseconds: 50));
- }
- mutex = true;
-
- try {
- ...
- } finally {
- mutex = false;
- }
-
- return null;
- }
+ final _semaphore = Future<void>.value();
+ Future<TranscripSegmentSocketService?> socket(
+ {required BleAudioCodec codec, required int sampleRate, bool force = false}) async {
+ await _semaphore;
+ final nextSemaphore = Completer<void>();
+ _semaphore = nextSemaphore.future;
+
+ try {
+ ...
+ } finally {
+ nextSemaphore.complete();
+ }
+
+ return null;
+ }
Future<TranscripSegmentSocketService?> speechProfile( | ||
{required BleAudioCodec codec, required int sampleRate, bool force = false}) async { | ||
debugPrint("socket speech profile > $codec $sampleRate $force"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The memory
and speechProfile
methods are identical and both call the socket
method with the same parameters. This violates the DRY (Don't Repeat Yourself) principle. Consider refactoring to eliminate redundancy.
- Future<TranscripSegmentSocketService?> memory(
- {required BleAudioCodec codec, required int sampleRate, bool force = false}) async {
- debugPrint("socket memory > $codec $sampleRate $force");
- return await socket(codec: codec, sampleRate: sampleRate, force: force);
- }
-
- Future<TranscripSegmentSocketService?> speechProfile(
- {required BleAudioCodec codec, required int sampleRate, bool force = false}) async {
- debugPrint("socket speech profile > $codec $sampleRate $force");
- return await socket(codec: codec, sampleRate: sampleRate, force: force);
- }
+ Future<TranscripSegmentSocketService?> getSocket(
+ {required String type, required BleAudioCodec codec, required int sampleRate, bool force = false}) async {
+ debugPrint("socket $type > $codec $sampleRate $force");
+ return await socket(codec: codec, sampleRate: sampleRate, force: force);
+ }
Then replace calls to memory
and speechProfile
with getSocket(type: 'memory', ...)
, and getSocket(type: 'speechProfile', ...)
, respectively.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Files selected (5)
- app/lib/pages/capture/widgets/widgets.dart (2)
- app/lib/pages/home/page.dart (3)
- app/lib/providers/capture_provider.dart (20)
- app/lib/providers/speech_profile_provider.dart (12)
- app/lib/utils/pure_socket.dart (1)
Files not summarized due to errors (2)
- app/lib/providers/capture_provider.dart (diff tokens exceeds limit)
- app/lib/utils/pure_socket.dart (diff tokens exceeds limit)
Review comments generated (37)
- Review: 23
- LGTM: 14
Tips
Chat with Entelligence.AI Bot (@Entelligence.AI
)
- Reply on review comments left by this bot to ask follow-up questions. A review comment is a comment on a diff or a file.
- Invite the bot into a review comment chain by tagging
@Entelligence.AI
in a reply.
Code suggestions
- The bot may make code suggestions, but please review them carefully before committing since the line number ranges may be misaligned.
- You can edit the comment made by the bot and manually tweak the suggestion if it is slightly off.
Pausing incremental reviews
- Add
@Entelligence.AI: ignore
anywhere in the PR description to pause further reviews from the bot.
@@ -4,6 +4,7 @@ import 'package:friend_private/backend/schema/bt_device.dart'; | |||
import 'package:friend_private/backend/schema/transcript_segment.dart'; | |||
import 'package:friend_private/pages/capture/connect.dart'; | |||
import 'package:friend_private/pages/speech_profile/page.dart'; | |||
import 'package:friend_private/providers/capture_provider.dart'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The import of CaptureProvider
is added, which seems to be replacing the functionality previously provided by DeviceProvider
. Ensure that all methods and properties used from DeviceProvider
are available and correctly implemented in CaptureProvider
.
+ import 'package:friend_private/providers/capture_provider.dart';
@@ -227,8 +228,7 @@ class SpeechProfileCardWidget extends StatelessWidget { | |||
await routeToPage(context, const SpeechProfilePage()); | |||
if (hasSpeakerProfile != SharedPreferencesUtil().hasSpeakerProfile) { | |||
if (context.mounted) { | |||
// TODO: is the websocket restarting once the user comes back? | |||
context.read<DeviceProvider>().restartWebSocket(); | |||
context.read<CaptureProvider>().onRecordProfileSettingChanged(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method onRecordProfileSettingChanged
is being called on CaptureProvider
instead of restarting the WebSocket connection via DeviceProvider
. Make sure this method handles all necessary tasks that were previously performed by the WebSocket restart.
- context.read<DeviceProvider>().restartWebSocket();
+ context.read<CaptureProvider>().onRecordProfileSettingChanged();
@@ -13,6 +13,7 @@ import 'package:friend_private/pages/home/device.dart'; | |||
import 'package:friend_private/pages/memories/page.dart'; | |||
import 'package:friend_private/pages/plugins/page.dart'; | |||
import 'package:friend_private/pages/settings/page.dart'; | |||
import 'package:friend_private/providers/capture_provider.dart'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The import of CaptureProvider
is added, which seems to be part of the refactoring process from using WebSocketProvider
to a new SocketServicePool
. Ensure that all instances where DeviceProvider
was used have been replaced with CaptureProvider
where necessary.
+ import 'package:friend_private/providers/capture_provider.dart';
@@ -21,6 +22,7 @@ import 'package:friend_private/providers/memory_provider.dart'; | |||
import 'package:friend_private/providers/message_provider.dart'; | |||
import 'package:friend_private/providers/plugin_provider.dart'; | |||
import 'package:friend_private/services/notification_service.dart'; | |||
import 'package:friend_private/services/services.dart'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (context.mounted) { | ||
context.read<CaptureProvider>().onRecordProfileSettingChanged(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The call to restartWebSocket()
has been replaced with onRecordProfileSettingChanged()
. This change seems to be part of the transition from WebSocketProvider
to SocketServicePool
. Ensure that onRecordProfileSettingChanged()
handles all the necessary operations previously performed by restartWebSocket()
.
- context.read<DeviceProvider>().restartWebSocket();
+ if (context.mounted) {
+ context.read<CaptureProvider>().onRecordProfileSettingChanged();
+ }
Future<bool> connect() async { | ||
return await _connect(); | ||
} | ||
|
||
Future<bool> _connect() async { | ||
if (_status == PureSocketStatus.connecting || _status == PureSocketStatus.connected) { | ||
return false; | ||
} | ||
|
||
_channel = IOWebSocketChannel.connect( | ||
url, | ||
pingInterval: const Duration(seconds: 10), | ||
connectTimeout: const Duration(seconds: 30), | ||
); | ||
if (_channel?.ready == null) { | ||
return false; | ||
} | ||
|
||
_status = PureSocketStatus.connecting; | ||
await _channel?.ready; | ||
_status = PureSocketStatus.connected; | ||
_retries = 0; | ||
|
||
final that = this; | ||
|
||
_channel?.stream.listen( | ||
(message) { | ||
that.onMessage(message); | ||
}, | ||
onError: (err, trace) { | ||
that.onError(err, trace); | ||
}, | ||
onDone: () { | ||
that.onClosed(); | ||
}, | ||
cancelOnError: true, | ||
); | ||
|
||
return true; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The _connect
method is not handling the case when the connection fails. If the connection fails, it will still set the status to PureSocketStatus.connected
and start listening on the channel. This could lead to unexpected behavior. You should add error handling to this method.
Future<bool> connect() async {
return await _connect();
}
Future<bool> _connect() async {
if (_status == PureSocketStatus.connecting || _status == PureSocketStatus.connected) {
return false;
}
_channel = IOWebSocketChannel.connect(
url,
pingInterval: const Duration(seconds: 10),
connectTimeout: const Duration(seconds: 30),
);
if (_channel?.ready == null) {
return false;
}
_status = PureSocketStatus.connecting;
try {
await _channel?.ready;
_status = PureSocketStatus.connected;
_retries = 0;
} catch (e) {
_status = PureSocketStatus.disconnected;
return false;
}
final that = this;
_channel?.stream.listen(
(message) {
that.onMessage(message);
},
onError: (err, trace) {
that.onError(err, trace);
},
onDone: () {
that.onClosed();
},
cancelOnError: true,
);
return true;
}
app/lib/utils/pure_socket.dart
Outdated
void _reconnect() async { | ||
const int initialBackoffTimeMs = 1000; // 1 second | ||
const double multiplier = 1.5; | ||
const int maxRetries = 7; | ||
|
||
if (_status == PureSocketStatus.connecting || _status == PureSocketStatus.connected) { | ||
debugPrint("[Socket] Can not reconnect, because socket is $_status"); | ||
return; | ||
} | ||
|
||
await _cleanUp(); | ||
|
||
var ok = await _connect(); | ||
if (ok) { | ||
return; | ||
} | ||
|
||
// retry | ||
int waitInMilliseconds = pow(multiplier, _retries).toInt() * initialBackoffTimeMs; | ||
await Future.delayed(Duration(milliseconds: waitInMilliseconds)); | ||
_retries++; | ||
if (_retries >= maxRetries) { | ||
debugPrint("[Socket] Reach max retries $maxRetries"); | ||
_listener?.onMaxRetriesReach(); | ||
return; | ||
} | ||
_reconnect(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The _reconnect
method is not handling the case when the connection fails. If the connection fails, it will still increment the retries and potentially enter an infinite loop of failed connections. You should add error handling to this method.
void _reconnect() async {
const int initialBackoffTimeMs = 1000; // 1 second
const double multiplier = 1.5;
const int maxRetries = 7;
if (_status == PureSocketStatus.connecting || _status == PureSocketStatus.connected) {
debugPrint("[Socket] Can not reconnect, because socket is $_status");
return;
}
await _cleanUp();
var ok = await _connect();
if (ok) {
return;
}
// retry
int waitInMilliseconds = pow(multiplier, _retries).toInt() * initialBackoffTimeMs;
await Future.delayed(Duration(milliseconds: waitInMilliseconds));
_retries++;
if (_retries >= maxRetries) {
debugPrint("[Socket] Reach max retries $maxRetries");
_listener?.onMaxRetriesReach();
return;
}
_reconnect();
}
app/lib/utils/pure_socket.dart
Outdated
@override | ||
void onInternetSatusChanged(InternetStatus status) { | ||
_internetStatus = status; | ||
switch (status) { | ||
case InternetStatus.connected: | ||
if (_status == PureSocketStatus.connected || _status == PureSocketStatus.connecting) { | ||
return; | ||
} | ||
_reconnect(); | ||
break; | ||
case InternetStatus.disconnected: | ||
var that = this; | ||
_internetLostDelayTimer?.cancel(); | ||
_internetLostDelayTimer = Timer(const Duration(seconds: 60), () async { | ||
if (_internetStatus != InternetStatus.disconnected) { | ||
return; | ||
} | ||
|
||
await that.disconnect(); | ||
_listener?.onInternetConnectionFailed(); | ||
}); | ||
|
||
break; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The onInternetSatusChanged
method is not handling the case when the internet status changes while a reconnection attempt is in progress. This could lead to unexpected behavior. You should add a check for this condition.
@override
void onInternetSatusChanged(InternetStatus status) {
_internetStatus = status;
switch (status) {
case InternetStatus.connected:
if (_status == PureSocketStatus.connected || _status == PureSocketStatus.connecting) {
return;
}
_reconnect();
break;
case InternetStatus.disconnected:
var that = this;
_internetLostDelayTimer?.cancel();
_internetLostDelayTimer = Timer(const Duration(seconds: 60), () async {
if (_internetStatus != InternetStatus.disconnected) {
return;
}
await that.disconnect();
_listener?.onInternetConnectionFailed();
});
break;
}
}
void subscribe(Object context, ITransctipSegmentSocketServiceListener listener) { | ||
_listeners.remove(context.hashCode); | ||
_listeners.putIfAbsent(context.hashCode, () => listener); | ||
} | ||
|
||
void unsubscribe(Object context) { | ||
_listeners.remove(context.hashCode); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The subscribe
and unsubscribe
methods are using the hash code of the context object as a key. This could lead to unexpected behavior if two different objects have the same hash code. You should use the context object itself as the key.
void subscribe(Object context, ITransctipSegmentSocketServiceListener listener) {
_listeners.remove(context);
_listeners.putIfAbsent(context, () => listener);
}
void unsubscribe(Object context) {
_listeners.remove(context);
}
void onMessage(event) { | ||
debugPrint("[TranscriptSegmentService] onMessage ${event}"); | ||
if (event == 'ping') return; | ||
|
||
// Decode json | ||
dynamic jsonEvent; | ||
try { | ||
jsonEvent = jsonDecode(event); | ||
} on FormatException catch (e) { | ||
debugPrint(e.toString()); | ||
} | ||
if (jsonEvent == null) { | ||
debugPrint("Can not decode message event json $event"); | ||
return; | ||
} | ||
|
||
// Transcript segments | ||
if (jsonEvent is List) { | ||
var segments = jsonEvent; | ||
if (segments.isEmpty) { | ||
return; | ||
} | ||
_listeners.forEach((k, v) { | ||
v.onSegmentReceived(segments.map((e) => TranscriptSegment.fromJson(e)).toList()); | ||
}); | ||
return; | ||
} | ||
|
||
debugPrint(event); | ||
|
||
// Message event | ||
if (jsonEvent.containsKey("type")) { | ||
var event = ServerMessageEvent.fromJson(jsonEvent); | ||
_listeners.forEach((k, v) { | ||
v.onMessageEventReceived(event); | ||
}); | ||
return; | ||
} | ||
|
||
debugPrint(event.toString()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The onMessage
method is not handling the case when the event is neither a list nor contains a "type" field. This could lead to unexpected behavior. You should add error handling for this case.
@override
void onMessage(event) {
debugPrint("[TranscriptSegmentService] onMessage ${event}");
if (event == 'ping') return;
// Decode json
dynamic jsonEvent;
try {
jsonEvent = jsonDecode(event);
} on FormatException catch (e) {
debugPrint(e.toString());
}
if (jsonEvent == null) {
debugPrint("Can not decode message event json $event");
return;
}
// Transcript segments
if (jsonEvent is List) {
var segments = jsonEvent;
if (segments.isEmpty) {
return;
}
_listeners.forEach((k, v) {
v.onSegmentReceived(segments.map((e) => TranscriptSegment.fromJson(e)).toList());
});
return;
}
debugPrint(event);
// Message event
if (jsonEvent.containsKey("type")) {
var event = ServerMessageEvent.fromJson(jsonEvent);
_listeners.forEach((k, v) {
v.onMessageEventReceived(event);
});
return;
}
debugPrint(event.toString());
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Files selected (1)
- app/lib/utils/pure_socket.dart (1)
Files not summarized due to errors (1)
- app/lib/utils/pure_socket.dart (diff tokens exceeds limit)
Review comments generated (1)
- Review: 1
- LGTM: 0
Tips
Chat with Entelligence.AI Bot (@Entelligence.AI
)
- Reply on review comments left by this bot to ask follow-up questions. A review comment is a comment on a diff or a file.
- Invite the bot into a review comment chain by tagging
@Entelligence.AI
in a reply.
Code suggestions
- The bot may make code suggestions, but please review them carefully before committing since the line number ranges may be misaligned.
- You can edit the comment made by the bot and manually tweak the suggestion if it is slightly off.
Pausing incremental reviews
- Add
@Entelligence.AI: ignore
anywhere in the PR description to pause further reviews from the bot.
} | ||
|
||
Future stop({String? reason}) async { | ||
await _socket.stop(); | ||
_listeners.clear(); | ||
|
||
if (reason != null) { | ||
debugPrint(reason); | ||
} | ||
} | ||
|
||
Future send(dynamic message) async { | ||
_socket.send(message); | ||
return; | ||
} | ||
|
||
@override | ||
void onClosed() { | ||
_listeners.forEach((k, v) { | ||
v.onClosed(); | ||
}); | ||
} | ||
|
||
@override | ||
void onError(Object err, StackTrace trace) { | ||
_listeners.forEach((k, v) { | ||
v.onError(err); | ||
}); | ||
} | ||
|
||
@override | ||
void onMessage(event) { | ||
debugPrint("[TranscriptSegmentService] onMessage ${event}"); | ||
if (event == 'ping') return; | ||
|
||
// Decode json | ||
dynamic jsonEvent; | ||
try { | ||
jsonEvent = jsonDecode(event); | ||
} on FormatException catch (e) { | ||
debugPrint(e.toString()); | ||
} | ||
if (jsonEvent == null) { | ||
debugPrint("Can not decode message event json $event"); | ||
return; | ||
} | ||
|
||
// Transcript segments | ||
if (jsonEvent is List) { | ||
var segments = jsonEvent; | ||
if (segments.isEmpty) { | ||
return; | ||
} | ||
_listeners.forEach((k, v) { | ||
v.onSegmentReceived(segments.map((e) => TranscriptSegment.fromJson(e)).toList()); | ||
}); | ||
return; | ||
} | ||
|
||
debugPrint(event); | ||
|
||
// Message event | ||
if (jsonEvent.containsKey("type")) { | ||
var event = ServerMessageEvent.fromJson(jsonEvent); | ||
_listeners.forEach((k, v) { | ||
v.onMessageEventReceived(event); | ||
}); | ||
return; | ||
} | ||
|
||
debugPrint(event.toString()); | ||
} | ||
|
||
@override | ||
void onInternetConnectionFailed() { | ||
debugPrint("onInternetConnectionFailed"); | ||
|
||
// Send notification | ||
NotificationService.instance.clearNotification(3); | ||
NotificationService.instance.createNotification( | ||
notificationId: 3, | ||
title: 'Internet Connection Lost', | ||
body: 'Your device is offline. Transcription is paused until connection is restored.', | ||
); | ||
} | ||
|
||
@override | ||
void onMaxRetriesReach() { | ||
debugPrint("onMaxRetriesReach"); | ||
|
||
// Send notification | ||
NotificationService.instance.clearNotification(2); | ||
NotificationService.instance.createNotification( | ||
notificationId: 2, | ||
title: 'Connection Issue 🚨', | ||
body: 'Unable to connect to the transcript service.' | ||
' Please restart the app or contact support if the problem persists.', | ||
); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PureSocket
class and its related classes seem to be well implemented with proper error handling, connection management, and retry logic. However, there are a few areas that could be improved:
-
In the
connect
method (lines 106-145), it would be better to handle exceptions that might occur during the connection process. This can help prevent unhandled exceptions from propagating up the call stack. -
The
send
method (line 192) doesn't check if the socket is connected before sending a message. This could lead to an exception if the socket is not connected. -
The
onInternetSatusChanged
method (lines 226-250) seems to have a potential issue. If the internet status changes to disconnected, it starts a timer to disconnect the socket after 60 seconds. However, if the internet status changes back to connected within these 60 seconds, the timer is not cancelled, which means the socket will still be disconnected after the timer expires. -
The
TranscripSegmentSocketService
class (lines 276-416) does not seem to handle the case where the listener is null. It's always a good idea to check for null before invoking methods on an object.
Here are the suggested fixes:
Future<bool> connect() async {
try {
return await _connect();
} catch (e) {
print('Error connecting to socket: $e');
return false;
}
}
void send(message) {
if (_status != PureSocketStatus.connected) {
print('Cannot send message. Socket is not connected.');
return;
}
_channel?.sink.add(message);
}
void onInternetSatusChanged(InternetStatus status) {
debugPrint("[Socket] Internet connection changed $status");
_internetStatus = status;
switch (status) {
case InternetStatus.connected:
_internetLostDelayTimer?.cancel();
if (_status == PureSocketStatus.connected || _status == PureSocketStatus.connecting) {
return;
}
_reconnect();
break;
@override
void onClosed() {
if (_listener != null) {
_listeners.forEach((k, v) {
v.onClosed();
});
}
}
@override
void onError(Object err, StackTrace trace) {
if (_listener != null) {
_listeners.forEach((k, v) {
v.onError(err);
});
}
}
@override
void onMessage(event) {
if (_listener != null) {
debugPrint("[TranscriptSegmentService] onMessage ${event}");
if (event == 'ping') return;
// ... rest of the code ...
}
}
Issue: #865
Summary by Entelligence.AI
WebSocketProvider
dependency fromDeviceProvider
, improving the modularity and maintainability of the code.SocketServicePool
for managing memory sockets, enhancing performance and preventing race conditions.SpeechProfileProvider
andDeviceProvider
to use new socket services, streamlining the handling of speech profile processing._onReceiveTaskData
method to ensure widget is mounted before setting geolocation data, preventing potential crashes.WebSocketProvider
class as part of a shift towards more efficient socket management.